diff options
Diffstat (limited to 'httpdump.py')
| -rw-r--r-- | httpdump.py | 170 |
1 files changed, 170 insertions, 0 deletions
diff --git a/httpdump.py b/httpdump.py new file mode 100644 index 0000000..68f6a26 --- /dev/null +++ b/httpdump.py @@ -0,0 +1,170 @@ +#!/usr/bin/env python3 +import base64 +import json +import os +import select +import sys +from urllib.parse import parse_qs, urlparse + +import click + + +def decode(b: bytes, encoding: str = "ascii") -> str: + try: + return b.decode(encoding) + except (UnicodeDecodeError, AttributeError): + return "" + + +def parse_multipart(body: bytes, boundary: bytes) -> dict: + parts = {} + for part in body.split(b"--" + boundary): + part = part.strip() + if not part or part == b"--": + continue + if b"\r\n\r\n" in part: + headers_raw, content = part.split(b"\r\n\r\n", 1) + elif b"\n\n" in part: + headers_raw, content = part.split(b"\n\n", 1) + else: + continue + + headers = {} + for line in headers_raw.split(b"\n"): + if b": " in line: + k, v = line.strip().split(b": ", 1) + headers[decode(k).lower()] = decode(v) + + disposition = headers.get("content-disposition", "") + name = None + filename = None + for item in disposition.split(";"): + item = item.strip() + if item.startswith("name="): + name = item[5:].strip('"') + elif item.startswith("filename="): + filename = item[9:].strip('"') + + if name: + content = content.rstrip(b"\r\n-") + if filename: + parts[name] = {"filename": filename, "content": decode(content, "utf-8")} + else: + parts[name] = decode(content, "utf-8") + return parts + + +def parse_body(body: bytes, content_type: str | None) -> dict: + result = {"raw_base64": base64.b64encode(body).decode("ascii")} + if body and content_type: + if "application/json" in content_type: + try: + result["json"] = json.loads(body) + except (json.JSONDecodeError, UnicodeDecodeError): + pass + elif "application/x-www-form-urlencoded" in content_type: + try: + result["form"] = {k: v[0] if len(v) == 1 else v for k, v in parse_qs(decode(body, "utf-8")).items()} + except: + pass + elif "multipart/form-data" in content_type: + for part in content_type.split(";"): + if "boundary=" in part: + boundary = part.split("boundary=", 1)[1].strip().encode() + result["multipart"] = parse_multipart(body, boundary) + break + return result + + +def read_http_message(fd: int) -> tuple[bytes, bytes]: + buf = b"" + timeout = 30.0 + while b"\r\n\r\n" not in buf: + if not select.select([fd], [], [], timeout)[0]: + break + chunk = os.read(fd, 4096) + if not chunk: + break + buf += chunk + timeout = 0.1 + + if b"\r\n\r\n" not in buf: + return buf, b"" + + header_part, _, body = buf.partition(b"\r\n\r\n") + headers = {} + for line in header_part.split(b"\r\n")[1:]: + if b": " in line: + k, v = line.split(b": ", 1) + headers[decode(k).lower()] = decode(v) + + remaining = int(headers.get("content-length", 0)) - len(body) + while remaining > 0: + if not select.select([fd], [], [], 0.1)[0]: + break + chunk = os.read(fd, remaining) + if not chunk: + break + body += chunk + remaining -= len(chunk) + + return header_part, body + + +def parse_message(header_part: bytes, body: bytes, is_response: bool) -> dict: + lines = header_part.split(b"\r\n") + first_line = decode(lines[0]).split(" ", 2) + + parsed = {} + for line in lines[1:]: + if b": " in line: + k, v = line.split(b": ", 1) + parsed[decode(k)] = decode(v) + headers = { + "raw_base64": base64.b64encode(b"\r\n".join(lines[1:])).decode("ascii"), + "parsed": parsed, + } + + content_type = parsed.get("Content-Type") + + if is_response: + return { + "status_code": int(first_line[1]) if len(first_line) > 1 else 0, + "status_text": first_line[2] if len(first_line) > 2 else "", + "headers": headers, + "body": parse_body(body, content_type), + } + + path = first_line[1] if len(first_line) > 1 else "/" + parsed_url = urlparse(path) + return { + "method": first_line[0], + "url": path, + "path": parsed_url.path, + "query_params": {k: v[0] if len(v) == 1 else v for k, v in parse_qs(parsed_url.query).items()}, + "headers": headers, + "body": parse_body(body, content_type), + } + + +@click.command() +@click.option("--response", "-r", is_flag=True, help="Parse as HTTP response instead of request") +@click.argument("file", required=False, type=click.Path(exists=True)) +def main(response: bool, file: str | None): + """Parse HTTP requests and responses into JSON.""" + if file: + with open(file, "rb") as f: + data = f.read() + if b"\r\n\r\n" in data: + header_part, body = data.split(b"\r\n\r\n", 1) + else: + header_part, body = data, b"" + else: + header_part, body = read_http_message(sys.stdin.fileno()) + + print(json.dumps(parse_message(header_part, body, response), indent=2), flush=True) + os._exit(0) + + +if __name__ == "__main__": + main() |
