import json import os import time from mitmproxy import http class CaptureAddon: def __init__(self): self.output_dir = os.environ.get("CAPTURE_DIR", "/captures") os.makedirs(self.output_dir, exist_ok=True) self.counter = 0 def request(self, flow: http.HTTPFlow): if "googleapis.com" not in (flow.request.host or ""): return self.counter += 1 flow.metadata["capture_id"] = self.counter body = None try: body = json.loads(flow.request.get_text()) except Exception: body = flow.request.get_text() source = flow.request.headers.get("x-capture-source") or ( "gemini-cli" if "GeminiCLI" in flow.request.headers.get("user-agent", "") else "unknown" ) data = { "capture_id": self.counter, "source": source, "timestamp": time.time(), "method": flow.request.method, "url": flow.request.pretty_url, "headers": dict(flow.request.headers), "body": body, } path = os.path.join(self.output_dir, f"req_{self.counter:04d}.json") with open(path, "w") as f: json.dump(data, f, indent=2, default=str) print( f"[capture] #{self.counter} [{source}] {flow.request.method} {flow.request.pretty_url}" ) def response(self, flow: http.HTTPFlow): if "googleapis.com" not in (flow.request.host or ""): return capture_id = flow.metadata.get("capture_id") if not capture_id: return body = None try: body = json.loads(flow.response.get_text()) except Exception: body = flow.response.get_text()[:4000] path = os.path.join(self.output_dir, f"res_{capture_id:04d}.json") with open(path, "w") as f: json.dump( { "status": flow.response.status_code, "headers": dict(flow.response.headers), "body": body, }, f, indent=2, default=str, ) addons = [CaptureAddon()]