gabriel / musehub public
magic_bytes.py python
178 lines 6.6 KB
Raw
sha256:5667a3e21bf16fd2e6d6bd4a769bd1c0cf7634afa12cef6450cc77573196b7f9 asyncpg caps query parameters Human patch 8 days ago
1 """Magic-bytes file type validation.
2
3 Prevents polyglot file attacks where a file is served under a trusted
4 extension (e.g. ``.jpg``) but its actual content is something else (e.g.
5 a PHP script, a ZIP archive, or an HTML file containing JavaScript).
6
7 ``check_magic_bytes(path, content)`` returns the detected type string and
8 raises ``PolyglotFileError`` when the declared extension does not match
9 the content's magic bytes.
10
11 Supported type detection (first-bytes signatures):
12 MIDI — MThd header
13 MP3 — ID3 tag or MPEG sync word
14 WebP — RIFF….WEBP container
15 PNG — PNG signature
16 JPEG — JPEG SOI marker
17 ZIP — PK signature (also covers .muse objects if ever zipped)
18 PDF — %PDF header
19 HTML — starts with <!DOCTYPE html or <html
20 Script — starts with #! (shebang)
21 JSON / XML are validated by structure, not magic bytes.
22
23 Files whose extension is not in the known-type map are allowed through
24 without a magic-bytes check (content-addressed storage of arbitrary
25 source files must not be blocked).
26 """
27
28 from collections.abc import Callable
29 from pathlib import Path
30
31 type _Checker = Callable[[bytes], bool]
32 type _CheckerEntry = tuple[str, _Checker]
33
34 class PolyglotFileError(ValueError):
35 """Raised when declared file extension does not match content magic bytes."""
36
37 # ── Magic byte signatures ─────────────────────────────────────────────────────
38
39 # (offset, expected_bytes) tuples. offset is the byte position to match at.
40 _MIDI_MAGIC = [(0, b"MThd")]
41 _MP3_ID3 = [(0, b"ID3")]
42 _MP3_SYNC_1 = [(0, bytes([0xFF, 0xFB]))]
43 _MP3_SYNC_2 = [(0, bytes([0xFF, 0xF3]))]
44 _MP3_SYNC_3 = [(0, bytes([0xFF, 0xF2]))]
45 _WEBP_RIFF = [(0, b"RIFF"), (8, b"WEBP")]
46 _PNG_MAGIC = [(0, bytes([0x89, 0x50, 0x4E, 0x47, 0x0D, 0x0A, 0x1A, 0x0A]))]
47 _JPEG_MAGIC = [(0, bytes([0xFF, 0xD8, 0xFF]))]
48 _ZIP_MAGIC = [(0, bytes([0x50, 0x4B, 0x03, 0x04]))]
49 _PDF_MAGIC = [(0, b"%PDF")]
50 _HTML_DOCTYPE = [(0, b"<!DOCTYPE"), (0, b"<!doctype")]
51 _HTML_TAG = [(0, b"<html"), (0, b"<HTML")]
52 _SHEBANG = [(0, b"#!")]
53
54 def _match_any(content: bytes, checks: list[tuple[int, bytes]]) -> bool:
55 """Return True if content matches ALL (offset, expected) pairs in *checks*."""
56 for offset, expected in checks:
57 end = offset + len(expected)
58 if len(content) < end:
59 return False
60 if content[offset:end] != expected:
61 return False
62 return True
63
64 def _is_midi(content: bytes) -> bool:
65 return _match_any(content, _MIDI_MAGIC)
66
67 def _is_mp3(content: bytes) -> bool:
68 return (
69 _match_any(content, _MP3_ID3)
70 or _match_any(content, _MP3_SYNC_1)
71 or _match_any(content, _MP3_SYNC_2)
72 or _match_any(content, _MP3_SYNC_3)
73 )
74
75 def _is_webp(content: bytes) -> bool:
76 return (
77 len(content) >= 12
78 and content[0:4] == b"RIFF"
79 and content[8:12] == b"WEBP"
80 )
81
82 def _is_png(content: bytes) -> bool:
83 return _match_any(content, _PNG_MAGIC)
84
85 def _is_jpeg(content: bytes) -> bool:
86 return _match_any(content, _JPEG_MAGIC)
87
88 def _is_zip(content: bytes) -> bool:
89 return _match_any(content, _ZIP_MAGIC)
90
91 def _is_pdf(content: bytes) -> bool:
92 return _match_any(content, _PDF_MAGIC)
93
94 def _looks_like_html(content: bytes) -> bool:
95 head = content[:16].lower()
96 return head.startswith(b"<!doctype") or head.startswith(b"<html")
97
98 def _is_shebang(content: bytes) -> bool:
99 return content[:2] == b"#!"
100
101 # ── Per-extension expected checker ────────────────────────────────────────────
102
103 # Maps extension → (display_name, checker_fn) for extensions we care about.
104 # Extensions not in this map are passed through without validation.
105 _EXTENSION_CHECKERS = {
106 ".mid": ("MIDI", _is_midi),
107 ".midi": ("MIDI", _is_midi),
108 ".mp3": ("MP3", _is_mp3),
109 ".webp": ("WebP", _is_webp),
110 ".png": ("PNG", _is_png),
111 ".jpg": ("JPEG", _is_jpeg),
112 ".jpeg": ("JPEG", _is_jpeg),
113 ".zip": ("ZIP", _is_zip),
114 ".pdf": ("PDF", _is_pdf),
115 }
116
117 # Content that is NEVER acceptable inside any uploaded binary file regardless
118 # of extension — catches the most dangerous polyglot attacks.
119 _FORBIDDEN_CHECKERS: list[_CheckerEntry] = [
120 ("HTML/JS", _looks_like_html),
121 ("shebang", _is_shebang),
122 ]
123
124 # Extensions where a shebang (#!) is legitimate and must not be flagged.
125 # Script/text files routinely start with #!/usr/bin/env <interpreter>.
126 _SHEBANG_ALLOWED_EXTENSIONS: frozenset[str] = frozenset({
127 ".py", ".sh", ".bash", ".zsh", ".rb", ".pl", ".r",
128 ".js", ".ts", ".mjs", ".cjs",
129 ".lua", ".php", ".tcl", ".awk",
130 ".bats", # BATS (Bash Automated Testing System) — starts with #!/usr/bin/env bats
131 })
132
133 def check_magic_bytes(path: str, content: bytes) -> str:
134 """Validate that *content* magic bytes match the extension of *path*.
135
136 Args:
137 path: The file path or filename (used only to extract the extension).
138 content: The raw file bytes. Only the first ~16 bytes are examined.
139
140 Returns:
141 The detected type string (e.g. ``"MIDI"``, ``"MP3"``, ``"unknown"``).
142
143 Raises:
144 PolyglotFileError: When the extension implies a known type but the
145 magic bytes do not match, or when the content matches a forbidden
146 pattern (HTML, shebang) regardless of extension.
147 """
148 if not content:
149 return "empty"
150
151 ext = Path(path).suffix.lower()
152
153 # Check for universally forbidden content signatures.
154 for label, checker_fn in _FORBIDDEN_CHECKERS:
155 if checker_fn(content):
156 # Allow HTML if the extension is explicitly .html / .htm
157 if label == "HTML/JS" and ext in (".html", ".htm"):
158 return "HTML"
159 # Allow shebang (#!) for script/text extensions — they legitimately
160 # start with #!/usr/bin/env <interpreter> and are not polyglot attacks.
161 if label == "shebang" and ext in _SHEBANG_ALLOWED_EXTENSIONS:
162 continue
163 raise PolyglotFileError(
164 f"File '{path}' has {ext!r} extension but content looks like "
165 f"{label} — possible polyglot attack."
166 )
167
168 # Extension-specific check.
169 if ext not in _EXTENSION_CHECKERS:
170 return "unknown" # no check for this extension — allow through
171
172 type_name, checker_fn = _EXTENSION_CHECKERS[ext]
173 if not checker_fn(content):
174 raise PolyglotFileError(
175 f"File '{path}' has {ext!r} extension but magic bytes do not match "
176 f"{type_name} format — possible polyglot file."
177 )
178 return type_name
File History 1 commit
sha256:5667a3e21bf16fd2e6d6bd4a769bd1c0cf7634afa12cef6450cc77573196b7f9 asyncpg caps query parameters Human patch 8 days ago