gabriel / muse public
git_primitives.py python
587 lines 18.6 KB
Raw
sha256:84df9126d09aeec0b8f1b908f0b06c10913feec28f3514b382efb1ba6d619385 refactor: rename StructuredMergePlugin to AddressedMergePlu… Sonnet 4.6 minor ⚠ breaking 23 days ago
1 """Bridge git primitives — low-level git subprocess wrappers.
2
3 All functions that shell out to ``git`` live here. Nothing in this module
4 knows about Muse commits or snapshots — it only translates git wire output
5 into Python dicts and bytes.
6
7 Exports
8 -------
9 GitCommitInfo, GitTagInfo TypedDicts for git log / tag output
10 _DEFAULT_EXCLUDE_PREFIXES/SUFFIXES Import filter constants
11 _should_exclude Path exclusion helper
12 AttributionMapper email → Muse handle mapper
13 _validate_git_sha 40-char hex guard
14 _validate_git_branch_name Shell-safe branch name guard
15 _CatFile Long-running git cat-file process
16 _git One-shot git subprocess
17 _batch_commit_log Bulk commit log reader
18 _batch_diff_tree Commit → changed-files map
19 _is_lfs_pointer LFS pointer detector
20 _strip_ansi, _parse_sem_ver_bump Message helpers
21 _list_git_branches, _list_git_tags Remote-ref listers
22 """
23
24 from __future__ import annotations
25
26 import hashlib
27 import pathlib
28 import re
29 import subprocess
30 from typing import TypedDict
31
32 from muse.core.types import load_json_file
33
34
35 # ---------------------------------------------------------------------------
36 # Git-specific TypedDicts
37 # ---------------------------------------------------------------------------
38
39 FileDiffTree = dict[str, str | None]
40 """Mapping from POSIX path to blob SHA-1 (``None`` = deleted)."""
41
42
43 class GitCommitInfo(TypedDict):
44 """One commit record returned by ``git log`` via :func:`_batch_commit_log`."""
45
46 sha: str
47 parent_shas: list[str]
48 author_email: str
49 author_name: str
50 author_date: str
51 subject: str
52 body: str
53 is_merge: bool
54
55
56 class GitTagInfo(TypedDict):
57 """One tag descriptor returned by :func:`_list_git_tags`."""
58
59 name: str
60 sha: str
61 is_semver: bool
62
63
64 # ---------------------------------------------------------------------------
65 # Import path exclusion
66 # ---------------------------------------------------------------------------
67
68 _DEFAULT_EXCLUDE_PREFIXES: list[str] = [
69 ".git/",
70 "node_modules/",
71 ".venv/",
72 ".env/",
73 "__pycache__/",
74 ".tox/",
75 ".mypy_cache/",
76 ".pytest_cache/",
77 "dist/",
78 "build/",
79 ".eggs/",
80 ]
81
82 _DEFAULT_EXCLUDE_SUFFIXES: list[str] = [
83 ".pyc",
84 ".pyo",
85 ".DS_Store",
86 ".swp",
87 ".swo",
88 ".orig",
89 ".rej",
90 ".class",
91 ".o",
92 ".obj",
93 ".so",
94 ".dll",
95 ".exe",
96 ".log",
97 ]
98
99
100 def _should_exclude(path: str, extra_patterns: list[str] | None = None) -> bool:
101 """Return True if *path* matches any default or extra exclude pattern.
102
103 Checks against :data:`_DEFAULT_EXCLUDE_PREFIXES` (prefix match),
104 :data:`_DEFAULT_EXCLUDE_SUFFIXES` (suffix match), and any caller-supplied
105 *extra_patterns* (substring match). The ``.git/`` prefix is always
106 excluded so git metadata never leaks into a Muse import.
107
108 Args:
109 path: POSIX-relative file path to test.
110 extra_patterns: Additional patterns (substring match) to exclude.
111
112 Returns:
113 ``True`` if the path should be excluded from the import.
114 """
115 for prefix in _DEFAULT_EXCLUDE_PREFIXES:
116 if path.startswith(prefix) or (f"/{prefix}") in path:
117 return True
118 for suffix in _DEFAULT_EXCLUDE_SUFFIXES:
119 if path.endswith(suffix):
120 return True
121 if extra_patterns:
122 for pattern in extra_patterns:
123 if pattern in path:
124 return True
125 return False
126
127
128 # ---------------------------------------------------------------------------
129 # Security validation
130 # ---------------------------------------------------------------------------
131
132 _BRANCH_SAFE_RE = re.compile(r"^[a-zA-Z0-9_.\-/]+$")
133
134 # SHA-1 hex validation — exactly 40 lowercase hex characters.
135 _GIT_SHA_RE = re.compile(r"^[0-9a-f]{40}$")
136
137 # Characters banned in branch names passed to subprocess.
138 _BRANCH_UNSAFE_CHARS_RE = re.compile(r'[;&|`$()\\s\x00-\x1f\x7f]')
139
140
141 def _validate_git_sha(sha: str) -> bool:
142 """Return True only if *sha* is a valid 40-character lowercase hex SHA-1.
143
144 Args:
145 sha: String to validate.
146
147 Returns:
148 ``True`` for exactly 40 hex digits ``[0-9a-f]``, ``False`` otherwise.
149 """
150 return bool(_GIT_SHA_RE.match(sha))
151
152
153 def _validate_git_branch_name(branch: str) -> bool:
154 """Return True only if *branch* is safe to pass to a git subprocess.
155
156 Rejects any branch name containing shell metacharacters (``;``, ``&``,
157 ``|``, backtick, ``$``, ``(``, ``)``, spaces, or ASCII control chars).
158
159 Args:
160 branch: Branch name string to validate.
161
162 Returns:
163 ``True`` when the name contains no dangerous characters, ``False`` otherwise.
164 """
165 if not branch:
166 return False
167 dangerous = set(';& |`$()\\') | {chr(c) for c in range(0x00, 0x20)} | {chr(0x7f)}
168 return not any(ch in dangerous for ch in branch)
169
170
171 # ---------------------------------------------------------------------------
172 # Attribution mapper
173 # ---------------------------------------------------------------------------
174
175 class AttributionMapper:
176 """Maps git author emails to Muse handles via a JSON attribution file.
177
178 When no mapping file is provided (or an email is not in the map),
179 a synthetic handle of the form ``git-import/<sha256[:8]>`` is generated
180 from the email address. This ensures every imported commit has a
181 stable, reproducible author identity without requiring a full mapping.
182
183 Args:
184 map_file: Path to a JSON file mapping ``email → handle``. May be
185 ``None`` when no attribution map is supplied.
186
187 Attributes:
188 missed_emails: Set of email addresses that were not found in the map.
189 """
190
191 def __init__(self, map_file: pathlib.Path | None) -> None:
192 """Load the attribution map from *map_file*, or use an empty map."""
193 self._map: dict[str, str] = {}
194 self.missed_emails: set[str] = set()
195
196 if map_file is not None:
197 raw = load_json_file(map_file)
198 if isinstance(raw, dict):
199 for email, handle in raw.items():
200 if isinstance(email, str) and isinstance(handle, str):
201 cleaned = "".join(
202 ch for ch in handle
203 if ch >= " " and ch != "\x7f"
204 )
205 self._map[email] = cleaned
206
207 def get_handle(self, email: str, name: str = "") -> str:
208 """Return the Muse handle for *email*, or a synthetic fallback.
209
210 If *email* is present in the attribution map, the mapped handle is
211 returned. Otherwise a synthetic ``git-import/<sha256[:8]>`` handle
212 is generated and *email* is recorded in :attr:`missed_emails`.
213
214 Args:
215 email: Git author email address.
216 name: Git author display name (used in synthetic handle comment).
217
218 Returns:
219 A Muse handle string, guaranteed free of control characters.
220 """
221 if email in self._map:
222 return self._map[email]
223 self.missed_emails.add(email)
224 digest = hashlib.sha256(email.encode()).hexdigest()[:8]
225 return f"git-import/{digest}"
226
227
228 # ---------------------------------------------------------------------------
229 # _CatFile — long-running git cat-file process
230 # ---------------------------------------------------------------------------
231
232 class _CatFile:
233 """Long-running ``git cat-file --batch`` process manager.
234
235 Keeps a single ``git cat-file --batch`` subprocess alive for the duration
236 of an import run so per-blob subprocess startup overhead is eliminated.
237 Each call to :meth:`read` sends one SHA-1 to the process's stdin and reads
238 the response header + body.
239
240 The ``git cat-file --batch`` wire protocol::
241
242 ← send: ``<sha>\\n``
243 → receive: ``<sha> <type> <size>\\n``
244 ``<size-bytes-of-content>``
245 ``\\n`` (1-byte separator after content)
246
247 Args:
248 git_dir: Path to the git repository root (containing ``.git/``).
249 """
250
251 def __init__(self, git_dir: pathlib.Path) -> None:
252 """Start the git cat-file process."""
253 self._proc = subprocess.Popen(
254 ["git", "-C", str(git_dir), "cat-file", "--batch"],
255 stdin=subprocess.PIPE,
256 stdout=subprocess.PIPE,
257 )
258
259 def read(self, sha: str) -> bytes:
260 """Read one git object by SHA-1 and return its raw content bytes.
261
262 Args:
263 sha: 40-character git SHA-1 hash (lowercase hex).
264
265 Returns:
266 Raw content bytes of the object.
267
268 Raises:
269 ValueError: If *sha* is not a valid 40-character hex string.
270 RuntimeError: If the process has died or returns a missing-object header.
271 """
272 if not _validate_git_sha(sha):
273 raise ValueError(
274 f"Invalid git SHA-1: {sha!r}. "
275 "Must be exactly 40 lowercase hexadecimal characters."
276 )
277 assert self._proc.stdin is not None and self._proc.stdout is not None
278 self._proc.stdin.write(f"{sha}\n".encode())
279 self._proc.stdin.flush()
280 header = self._proc.stdout.readline().decode("utf-8", errors="replace").strip()
281 if not header or header.endswith(" missing"):
282 return b""
283 parts = header.rsplit(" ", 2)
284 if len(parts) < 3:
285 return b""
286 size = int(parts[2])
287 content = b""
288 remaining = size
289 while remaining > 0:
290 chunk = self._proc.stdout.read(min(remaining, 65536))
291 if not chunk:
292 break
293 content += chunk
294 remaining -= len(chunk)
295 # Consume the trailing newline separator
296 self._proc.stdout.read(1)
297 return content
298
299 def close(self) -> None:
300 """Terminate the git cat-file process."""
301 try:
302 if self._proc.stdin:
303 self._proc.stdin.close()
304 self._proc.wait(timeout=5)
305 except Exception: # noqa: BLE001
306 self._proc.kill()
307
308 def __enter__(self) -> "_CatFile":
309 return self
310
311 def __exit__(self, *_: object) -> None:
312 self.close()
313
314
315 # ---------------------------------------------------------------------------
316 # Low-level git wrappers
317 # ---------------------------------------------------------------------------
318
319 def _git(git_dir: pathlib.Path, *args: str, check: bool = True) -> str:
320 """Run a git command in *git_dir* and return stdout as a string.
321
322 Args:
323 git_dir: Path to the git repository root.
324 *args: Arguments to pass to git.
325 check: If True, raise ``CalledProcessError`` on non-zero exit.
326
327 Returns:
328 stdout as a UTF-8 string (errors replaced with '?').
329 """
330 result = subprocess.run(
331 ["git", "-C", str(git_dir), *args],
332 capture_output=True,
333 check=check,
334 )
335 return result.stdout.decode("utf-8", errors="replace")
336
337
338 def _batch_commit_log(
339 git_dir: pathlib.Path,
340 branch: str,
341 from_sha: str | None = None,
342 ) -> list[GitCommitInfo]:
343 """Return an oldest-first list of commit dicts for *branch*.
344
345 Uses ``git log --format=...`` with NUL / record-separator delimiters so
346 commit messages containing newlines or spaces are parsed correctly.
347
348 Each dict contains:
349 ``sha``, ``parent_shas`` (list), ``author_email``, ``author_name``,
350 ``author_date`` (ISO-8601), ``subject``, ``body``, ``is_merge`` (bool).
351
352 Args:
353 git_dir: Path to the git repository root.
354 branch: Branch name or ref to log.
355 from_sha: If given, only return commits reachable from *branch* but
356 not from *from_sha* (exclusive — *from_sha* itself omitted).
357
358 Returns:
359 Commits in oldest-first order (ready for replay).
360 """
361 fmt = "%H%x00%P%x00%ae%x00%an%x00%aI%x00%s%x00%b%x1e"
362 rev_range = branch
363 if from_sha:
364 rev_range = f"{from_sha}..{branch}"
365
366 try:
367 raw = _git(git_dir, "log", f"--format={fmt}", "--reverse", rev_range)
368 except subprocess.CalledProcessError:
369 return []
370
371 records: list[GitCommitInfo] = []
372 for record in raw.split("\x1e"):
373 record = record.strip()
374 if not record:
375 continue
376 parts = record.split("\x00", 6)
377 if len(parts) < 6:
378 continue
379 sha, parents_str, email, name, date, subject = parts[:6]
380 body = parts[6] if len(parts) > 6 else ""
381 sha = sha.strip()
382 if not sha or len(sha) != 40:
383 continue
384 parent_shas = [p.strip() for p in parents_str.strip().split() if p.strip()]
385 records.append({
386 "sha": sha,
387 "parent_shas": parent_shas,
388 "author_email": email.strip(),
389 "author_name": name.strip(),
390 "author_date": date.strip(),
391 "subject": subject.strip(),
392 "body": body.strip(),
393 "is_merge": len(parent_shas) > 1,
394 })
395 return records
396
397
398 def _batch_diff_tree(
399 git_dir: pathlib.Path,
400 sha: str,
401 parent_sha: str | None = None,
402 ) -> FileDiffTree:
403 """Return ``{path: blob_sha | None}`` for files changed in commit *sha*.
404
405 When *parent_sha* is ``None`` (genesis commit), returns all files in the
406 tree. ``blob_sha=None`` means the file was deleted in this commit.
407
408 Args:
409 git_dir: Path to the git repository root.
410 sha: Commit SHA-1 to inspect.
411 parent_sha: Parent SHA-1, or ``None`` for the root commit.
412
413 Returns:
414 A mapping from POSIX path to blob SHA-1 (or ``None`` for deletions).
415 """
416 if parent_sha is None:
417 try:
418 raw = _git(git_dir, "ls-tree", "-r", "--format=%(objectname) %(path)", sha)
419 except subprocess.CalledProcessError:
420 return {}
421 result: dict[str, str | None] = {}
422 for line in raw.splitlines():
423 line = line.strip()
424 if not line:
425 continue
426 parts = line.split(" ", 1)
427 if len(parts) == 2:
428 result[parts[1]] = parts[0]
429 return result
430
431 try:
432 raw = _git(
433 git_dir,
434 "diff-tree", "--no-commit-id", "-r", "--name-status", "--diff-filter=ACDMRT",
435 parent_sha, sha,
436 )
437 except subprocess.CalledProcessError:
438 return {}
439
440 result = {}
441 added_modified: list[str] = []
442 deleted: list[str] = []
443
444 for line in raw.splitlines():
445 line = line.strip()
446 if not line:
447 continue
448 parts = line.split("\t", 2)
449 if len(parts) < 2:
450 continue
451 status = parts[0][0]
452 if status == "D":
453 deleted.append(parts[1])
454 elif status in ("A", "M", "T"):
455 added_modified.append(parts[1])
456 elif status in ("R", "C"):
457 if len(parts) >= 3:
458 deleted.append(parts[1])
459 added_modified.append(parts[2])
460
461 for path in deleted:
462 result[path] = None
463
464 if added_modified:
465 try:
466 ls_raw = _git(git_dir, "ls-tree", "-r", "--format=%(objectname) %(path)", sha)
467 except subprocess.CalledProcessError:
468 ls_raw = ""
469 blob_map: dict[str, str] = {}
470 for line in ls_raw.splitlines():
471 line = line.strip()
472 if not line:
473 continue
474 p = line.split(" ", 1)
475 if len(p) == 2:
476 blob_map[p[1]] = p[0]
477 for path in added_modified:
478 if path in blob_map:
479 result[path] = blob_map[path]
480
481 return result
482
483
484 # ---------------------------------------------------------------------------
485 # Miscellaneous helpers
486 # ---------------------------------------------------------------------------
487
488 def _is_lfs_pointer(content: bytes) -> bool:
489 """Return True if *content* is a git LFS pointer file.
490
491 LFS pointers always start with the version line:
492 ``version https://git-lfs.github.com/spec/``.
493
494 Args:
495 content: Raw file bytes to test.
496
497 Returns:
498 ``True`` if this is an LFS pointer, ``False`` otherwise.
499 """
500 return content.startswith(b"version https://git-lfs.github.com/spec/")
501
502
503 _ANSI_ESCAPE_RE = re.compile(r"\x1b\[[0-9;]*[mGKHF]")
504
505
506 def _strip_ansi(s: str) -> str:
507 """Strip ANSI escape sequences from *s*."""
508 return _ANSI_ESCAPE_RE.sub("", s)
509
510
511 def _parse_sem_ver_bump(message: str) -> str:
512 """Parse conventional commit prefix and return the semver bump level.
513
514 Rules (in priority order):
515 1. ``BREAKING CHANGE`` in body or ``feat!:`` / ``fix!:`` prefix → ``major``
516 2. ``feat:`` prefix → ``minor``
517 3. ``fix:`` / ``perf:`` prefix → ``patch``
518 4. Anything else → ``none``
519
520 Args:
521 message: Full commit message (subject + body).
522
523 Returns:
524 One of ``"major"``, ``"minor"``, ``"patch"``, or ``"none"``.
525 """
526 first_line = message.splitlines()[0] if message else ""
527
528 if "BREAKING CHANGE" in message or re.match(r"^\w+!:", first_line):
529 return "major"
530 if re.match(r"^feat(\(.+\))?:", first_line):
531 return "minor"
532 if re.match(r"^(fix|perf)(\(.+\))?:", first_line):
533 return "patch"
534 return "none"
535
536
537 # ---------------------------------------------------------------------------
538 # Git ref listers
539 # ---------------------------------------------------------------------------
540
541 def _list_git_branches(git_dir: pathlib.Path) -> list[str]:
542 """Return the list of local git branch names in *git_dir*.
543
544 Args:
545 git_dir: Path to the git repository root.
546
547 Returns:
548 A list of branch name strings (e.g. ``["main", "develop"]``).
549 """
550 try:
551 raw = _git(git_dir, "branch", "--format=%(refname:short)")
552 return [b.strip() for b in raw.splitlines() if b.strip()]
553 except subprocess.CalledProcessError:
554 return []
555
556
557 def _list_git_tags(git_dir: pathlib.Path) -> list[GitTagInfo]:
558 """Return a list of tag dicts from *git_dir*.
559
560 Each dict has keys:
561 ``name`` (str), ``sha`` (str), ``is_semver`` (bool).
562
563 Args:
564 git_dir: Path to the git repository root.
565
566 Returns:
567 A list of tag descriptor dicts.
568 """
569 _SEMVER_RE = re.compile(r"^v?\d+\.\d+\.\d+")
570 try:
571 raw = _git(git_dir, "tag", "-l", "--format=%(refname:short) %(objectname:short)")
572 tags: list[GitTagInfo] = []
573 for line in raw.splitlines():
574 line = line.strip()
575 if not line:
576 continue
577 parts = line.split(" ", 1)
578 name = parts[0]
579 sha = parts[1] if len(parts) > 1 else ""
580 tags.append({
581 "name": name,
582 "sha": sha,
583 "is_semver": bool(_SEMVER_RE.match(name)),
584 })
585 return tags
586 except subprocess.CalledProcessError:
587 return []
File History 1 commit
sha256:84df9126d09aeec0b8f1b908f0b06c10913feec28f3514b382efb1ba6d619385 refactor: rename StructuredMergePlugin to AddressedMergePlu… Sonnet 4.6 minor 23 days ago