#!/usr/bin/env python3 import base64 import json import os import select import sys from urllib.parse import parse_qs, urlparse import click def decode(b: bytes, encoding: str = "ascii") -> str: try: return b.decode(encoding) except (UnicodeDecodeError, AttributeError): return "" def parse_multipart(body: bytes, boundary: bytes) -> dict: parts = {} for part in body.split(b"--" + boundary): part = part.strip() if not part or part == b"--": continue if b"\r\n\r\n" in part: headers_raw, content = part.split(b"\r\n\r\n", 1) elif b"\n\n" in part: headers_raw, content = part.split(b"\n\n", 1) else: continue headers = {} for line in headers_raw.split(b"\n"): if b": " in line: k, v = line.strip().split(b": ", 1) headers[decode(k).lower()] = decode(v) disposition = headers.get("content-disposition", "") name = None filename = None for item in disposition.split(";"): item = item.strip() if item.startswith("name="): name = item[5:].strip('"') elif item.startswith("filename="): filename = item[9:].strip('"') if name: content = content.rstrip(b"\r\n-") if filename: parts[name] = {"filename": filename, "content": decode(content, "utf-8")} else: parts[name] = decode(content, "utf-8") return parts def parse_body(body: bytes, content_type: str | None) -> dict: result = {"raw_base64": base64.b64encode(body).decode("ascii")} if body and content_type: if "application/json" in content_type: try: result["json"] = json.loads(body) except (json.JSONDecodeError, UnicodeDecodeError): pass elif "application/x-www-form-urlencoded" in content_type: try: result["form"] = {k: v[0] if len(v) == 1 else v for k, v in parse_qs(decode(body, "utf-8")).items()} except: pass elif "multipart/form-data" in content_type: for part in content_type.split(";"): if "boundary=" in part: boundary = part.split("boundary=", 1)[1].strip().encode() result["multipart"] = parse_multipart(body, boundary) break return result def read_http_message(fd: int) -> tuple[bytes, bytes]: buf = b"" timeout = 30.0 while b"\r\n\r\n" not in buf: if not select.select([fd], [], [], timeout)[0]: break chunk = os.read(fd, 4096) if not chunk: break buf += chunk timeout = 0.1 if b"\r\n\r\n" not in buf: return buf, b"" header_part, _, body = buf.partition(b"\r\n\r\n") headers = {} for line in header_part.split(b"\r\n")[1:]: if b": " in line: k, v = line.split(b": ", 1) headers[decode(k).lower()] = decode(v) remaining = int(headers.get("content-length", 0)) - len(body) while remaining > 0: if not select.select([fd], [], [], 0.1)[0]: break chunk = os.read(fd, remaining) if not chunk: break body += chunk remaining -= len(chunk) return header_part, body def parse_message(header_part: bytes, body: bytes, is_response: bool) -> dict: lines = header_part.split(b"\r\n") first_line = decode(lines[0]).split(" ", 2) parsed = {} for line in lines[1:]: if b": " in line: k, v = line.split(b": ", 1) parsed[decode(k)] = decode(v) headers = { "raw_base64": base64.b64encode(b"\r\n".join(lines[1:])).decode("ascii"), "parsed": parsed, } content_type = parsed.get("Content-Type") if is_response: return { "status_code": int(first_line[1]) if len(first_line) > 1 else 0, "status_text": first_line[2] if len(first_line) > 2 else "", "headers": headers, "body": parse_body(body, content_type), } path = first_line[1] if len(first_line) > 1 else "/" parsed_url = urlparse(path) return { "method": first_line[0], "url": path, "path": parsed_url.path, "query_params": {k: v[0] if len(v) == 1 else v for k, v in parse_qs(parsed_url.query).items()}, "headers": headers, "body": parse_body(body, content_type), } @click.command() @click.option("--response", "-r", is_flag=True, help="Parse as HTTP response instead of request") @click.argument("file", required=False, type=click.Path(exists=True)) def main(response: bool, file: str | None): """Parse HTTP requests and responses into JSON.""" if file: with open(file, "rb") as f: data = f.read() if b"\r\n\r\n" in data: header_part, body = data.split(b"\r\n\r\n", 1) else: header_part, body = data, b"" else: header_part, body = read_http_message(sys.stdin.fileno()) print(json.dumps(parse_message(header_part, body, response), indent=2), flush=True) os._exit(0) if __name__ == "__main__": main()