aboutsummaryrefslogtreecommitdiffstats
path: root/httpdump.py
diff options
context:
space:
mode:
authorLouis Burda <dev@sinitax.com>2026-01-23 21:41:09 +0100
committerLouis Burda <dev@sinitax.com>2026-01-23 21:41:09 +0100
commit65a71e1d9f04e930cbaa5a312395f0fb63c7eef9 (patch)
treec0785f125f72d5ab10a89e3c7e086037f25ef871 /httpdump.py
downloadhttpdump-main.tar.gz
httpdump-main.zip
Add initial versionHEADmain
Diffstat (limited to 'httpdump.py')
-rw-r--r--httpdump.py170
1 files changed, 170 insertions, 0 deletions
diff --git a/httpdump.py b/httpdump.py
new file mode 100644
index 0000000..68f6a26
--- /dev/null
+++ b/httpdump.py
@@ -0,0 +1,170 @@
+#!/usr/bin/env python3
+import base64
+import json
+import os
+import select
+import sys
+from urllib.parse import parse_qs, urlparse
+
+import click
+
+
+def decode(b: bytes, encoding: str = "ascii") -> str:
+ try:
+ return b.decode(encoding)
+ except (UnicodeDecodeError, AttributeError):
+ return ""
+
+
+def parse_multipart(body: bytes, boundary: bytes) -> dict:
+ parts = {}
+ for part in body.split(b"--" + boundary):
+ part = part.strip()
+ if not part or part == b"--":
+ continue
+ if b"\r\n\r\n" in part:
+ headers_raw, content = part.split(b"\r\n\r\n", 1)
+ elif b"\n\n" in part:
+ headers_raw, content = part.split(b"\n\n", 1)
+ else:
+ continue
+
+ headers = {}
+ for line in headers_raw.split(b"\n"):
+ if b": " in line:
+ k, v = line.strip().split(b": ", 1)
+ headers[decode(k).lower()] = decode(v)
+
+ disposition = headers.get("content-disposition", "")
+ name = None
+ filename = None
+ for item in disposition.split(";"):
+ item = item.strip()
+ if item.startswith("name="):
+ name = item[5:].strip('"')
+ elif item.startswith("filename="):
+ filename = item[9:].strip('"')
+
+ if name:
+ content = content.rstrip(b"\r\n-")
+ if filename:
+ parts[name] = {"filename": filename, "content": decode(content, "utf-8")}
+ else:
+ parts[name] = decode(content, "utf-8")
+ return parts
+
+
+def parse_body(body: bytes, content_type: str | None) -> dict:
+ result = {"raw_base64": base64.b64encode(body).decode("ascii")}
+ if body and content_type:
+ if "application/json" in content_type:
+ try:
+ result["json"] = json.loads(body)
+ except (json.JSONDecodeError, UnicodeDecodeError):
+ pass
+ elif "application/x-www-form-urlencoded" in content_type:
+ try:
+ result["form"] = {k: v[0] if len(v) == 1 else v for k, v in parse_qs(decode(body, "utf-8")).items()}
+ except:
+ pass
+ elif "multipart/form-data" in content_type:
+ for part in content_type.split(";"):
+ if "boundary=" in part:
+ boundary = part.split("boundary=", 1)[1].strip().encode()
+ result["multipart"] = parse_multipart(body, boundary)
+ break
+ return result
+
+
+def read_http_message(fd: int) -> tuple[bytes, bytes]:
+ buf = b""
+ timeout = 30.0
+ while b"\r\n\r\n" not in buf:
+ if not select.select([fd], [], [], timeout)[0]:
+ break
+ chunk = os.read(fd, 4096)
+ if not chunk:
+ break
+ buf += chunk
+ timeout = 0.1
+
+ if b"\r\n\r\n" not in buf:
+ return buf, b""
+
+ header_part, _, body = buf.partition(b"\r\n\r\n")
+ headers = {}
+ for line in header_part.split(b"\r\n")[1:]:
+ if b": " in line:
+ k, v = line.split(b": ", 1)
+ headers[decode(k).lower()] = decode(v)
+
+ remaining = int(headers.get("content-length", 0)) - len(body)
+ while remaining > 0:
+ if not select.select([fd], [], [], 0.1)[0]:
+ break
+ chunk = os.read(fd, remaining)
+ if not chunk:
+ break
+ body += chunk
+ remaining -= len(chunk)
+
+ return header_part, body
+
+
+def parse_message(header_part: bytes, body: bytes, is_response: bool) -> dict:
+ lines = header_part.split(b"\r\n")
+ first_line = decode(lines[0]).split(" ", 2)
+
+ parsed = {}
+ for line in lines[1:]:
+ if b": " in line:
+ k, v = line.split(b": ", 1)
+ parsed[decode(k)] = decode(v)
+ headers = {
+ "raw_base64": base64.b64encode(b"\r\n".join(lines[1:])).decode("ascii"),
+ "parsed": parsed,
+ }
+
+ content_type = parsed.get("Content-Type")
+
+ if is_response:
+ return {
+ "status_code": int(first_line[1]) if len(first_line) > 1 else 0,
+ "status_text": first_line[2] if len(first_line) > 2 else "",
+ "headers": headers,
+ "body": parse_body(body, content_type),
+ }
+
+ path = first_line[1] if len(first_line) > 1 else "/"
+ parsed_url = urlparse(path)
+ return {
+ "method": first_line[0],
+ "url": path,
+ "path": parsed_url.path,
+ "query_params": {k: v[0] if len(v) == 1 else v for k, v in parse_qs(parsed_url.query).items()},
+ "headers": headers,
+ "body": parse_body(body, content_type),
+ }
+
+
+@click.command()
+@click.option("--response", "-r", is_flag=True, help="Parse as HTTP response instead of request")
+@click.argument("file", required=False, type=click.Path(exists=True))
+def main(response: bool, file: str | None):
+ """Parse HTTP requests and responses into JSON."""
+ if file:
+ with open(file, "rb") as f:
+ data = f.read()
+ if b"\r\n\r\n" in data:
+ header_part, body = data.split(b"\r\n\r\n", 1)
+ else:
+ header_part, body = data, b""
+ else:
+ header_part, body = read_http_message(sys.stdin.fileno())
+
+ print(json.dumps(parse_message(header_part, body, response), indent=2), flush=True)
+ os._exit(0)
+
+
+if __name__ == "__main__":
+ main()