""" Claude API client with streaming support. This client replicates the exact API behavior of Claude Code CLI v2.1.7, with full streaming support for better latency and user experience. """ from __future__ import annotations import hashlib import json import os import platform import uuid from collections.abc import AsyncIterator, Callable from pathlib import Path from typing import Any import httpx from .streaming import StreamParser, parse_sse_stream from .types import ( AgentOptions, AssistantMessage, StreamChunk, ToolResult, ) class ClaudeAgentClient: """ Async client for Claude API with streaming support. Features: - Streaming responses by default (better latency/UX) - Non-streaming mode available - Exact API replication from Claude Code - Prompt caching for 90% cost reduction - All 17 tools from Claude Code Example: # Streaming (default) async with ClaudeAgentClient() as client: async for chunk in client.send_message_stream("Hello"): if chunk.text_delta: print(chunk.text_delta, end='', flush=True) # Non-streaming client = ClaudeAgentClient(options=AgentOptions(stream=False)) response = await client.send_message("Hello") print(response.content) """ BASE_URL = "https://api.anthropic.com" API_VERSION = "2023-06-01" MODEL_INFO = { "haiku": {"display": "Haiku 4.5", "cutoff": "February 2025"}, "sonnet": {"display": "Sonnet 4.6", "cutoff": "August 2025"}, "opus": {"display": "Opus 4.6", "cutoff": "May 2025"}, } def __init__(self, api_key: str | None = None, options: AgentOptions | None = None): """ Initialize Claude client. Args: api_key: Anthropic API key (or set ANTHROPIC_API_KEY env var) options: Configuration options """ self.api_key = ( api_key or os.getenv("ANTHROPIC_API_KEY") or os.getenv("CLAUDE_CODE_OAUTH_TOKEN") ) if not self.api_key: raise ValueError("API key required: set ANTHROPIC_API_KEY or CLAUDE_CODE_OAUTH_TOKEN") self.options = options or AgentOptions() self.client = httpx.AsyncClient(http2=True, timeout=600.0) # Session management self.session_id = self.options.session_id or str(uuid.uuid4()) self.messages: list[dict[str, Any]] = [] self.device_id = ( self.options.device_id or hashlib.sha256(platform.node().encode()).hexdigest() ) self.account_uuid = self.options.account_uuid or os.getenv("CLAUDE_ACCOUNT_UUID") # Load tools and system prompt self._tools = self._load_tools() self._system_prompt = self._load_system_prompt() def _load_tools(self) -> list[dict[str, Any]]: """Load tool definitions from tools.json.""" tools_file = Path(__file__).parent / "tools.json" if tools_file.exists(): with open(tools_file) as f: return json.load(f) return [] def _load_system_prompt(self) -> list[dict[str, Any]]: """Load system prompt from system_prompt.json.""" prompt_file = Path(__file__).parent / "system_prompt.json" if prompt_file.exists(): with open(prompt_file) as f: return json.load(f) return [] BETA_BASE = "oauth-2025-04-20,interleaved-thinking-2025-05-14,prompt-caching-scope-2026-01-05,claude-code-20250219" BETA_ADAPTIVE = "claude-code-20250219,oauth-2025-04-20,interleaved-thinking-2025-05-14,prompt-caching-scope-2026-01-05,effort-2025-11-24,adaptive-thinking-2026-01-28" def _supports_adaptive(self) -> bool: m = self.options.model.lower() return "sonnet" in m or "opus" in m def _build_headers(self, retry_count: int = 0, streaming: bool = True) -> dict[str, str]: beta = self.BETA_ADAPTIVE if self._supports_adaptive() else self.BETA_BASE return { "accept": "application/json", "accept-language": "*", "anthropic-beta": beta, "anthropic-dangerous-direct-browser-access": "true", "anthropic-version": self.API_VERSION, "authorization": f"Bearer {self.api_key}", "content-type": "application/json", "sec-fetch-mode": "cors", "user-agent": "claude-cli/2.1.63 (external, sdk-cli)", "x-app": "cli", "x-stainless-arch": "x64", "x-stainless-lang": "js", "x-stainless-os": "Linux", "x-stainless-package-version": "0.74.0", "x-stainless-retry-count": str(retry_count), "x-stainless-runtime": "node", "x-stainless-runtime-version": "v20.20.0", "x-stainless-timeout": "600", } def _build_metadata(self) -> dict[str, str]: """ Build metadata object that matches captured format exactly. Format from captured traffic: "user_{device_hash}_account_{uuid}_session_{uuid}" """ account = self.account_uuid or str(uuid.uuid4()) user_id = f"user_{self.device_id}_account_{account}_session_{self.session_id}" return {"user_id": user_id} def _build_thinking(self) -> dict[str, Any]: if self._supports_adaptive(): return {"type": "adaptive"} return {"type": "enabled", "budget_tokens": self.options.max_tokens - 1} def _build_request_body(self, stream: bool) -> dict[str, Any]: body: dict[str, Any] = { "model": self.options.model, "max_tokens": self.options.max_tokens, "messages": self.messages, "system": self._build_system_prompt(), "tools": self._filter_tools(), "metadata": self._build_metadata(), "stream": stream, "thinking": self._build_thinking(), } if self.options.effort and self._supports_adaptive(): body["output_config"] = {"effort": self.options.effort} return body def _model_family(self) -> str: m = self.options.model.lower() for family in ("haiku", "sonnet", "opus"): if family in m: return family return "sonnet" def _build_system_prompt(self) -> list[dict[str, Any]]: if self.options.system_prompt: return [{"type": "text", "text": self.options.system_prompt}] import copy system = copy.deepcopy(self._system_prompt) info = self.MODEL_INFO.get(self._model_family(), self.MODEL_INFO["sonnet"]) for block in system: if "{model_display_name}" in block.get("text", ""): block["text"] = ( block["text"] .replace("{model_display_name}", info["display"]) .replace("{model_id}", self.options.model) .replace("{knowledge_cutoff}", info["cutoff"]) ) if self.options.append_system_prompt: system.append({"type": "text", "text": self.options.append_system_prompt}) return system def _filter_tools(self) -> list[dict[str, Any]]: """Filter tools based on allowed/disallowed lists.""" tools = self._tools.copy() if self.options.allowed_tools: tools = [t for t in tools if t["name"] in self.options.allowed_tools] if self.options.disallowed_tools: tools = [t for t in tools if t["name"] not in self.options.disallowed_tools] return tools async def send_message_stream( self, prompt: str, tool_results: list[ToolResult] | None = None, on_text: Callable[[str], None] | None = None, ) -> AsyncIterator[StreamChunk]: """ Send message and stream response chunks. Args: prompt: User message (can be empty if only sending tool results) tool_results: Optional tool execution results on_text: Optional callback for text deltas (for real-time display) Yields: StreamChunk objects with event data and text deltas Example: async for chunk in client.send_message_stream("Hello"): if chunk.text_delta: print(chunk.text_delta, end='', flush=True) """ # Build message content blocks: list[dict[str, Any]] = [] # Add tool results first (if any) if tool_results: for result in tool_results: blocks.append(result.model_dump()) # Add text prompt msg_content: str | list[dict[str, Any]] if prompt: if blocks: blocks.append({"type": "text", "text": prompt}) msg_content = blocks else: msg_content = prompt else: msg_content = blocks user_message: dict[str, Any] = {"role": "user", "content": msg_content} self.messages.append(user_message) request_body = self._build_request_body(stream=True) url = f"{self.BASE_URL}/v1/messages?beta=true" headers = self._build_headers(streaming=True) # Use httpx_sse to handle SSE streaming parser = StreamParser() async for chunk in parse_sse_stream(self.client, "POST", url, headers, request_body): parser.add_chunk(chunk) # Call text callback if provided if on_text and chunk.text_delta: on_text(chunk.text_delta) yield chunk # Add complete assistant message to history assistant_message = parser.to_dict() self.messages.append({"role": "assistant", "content": assistant_message["content"]}) async def send_message( self, prompt: str, tool_results: list[ToolResult] | None = None, ) -> AssistantMessage: """ Send message and get complete response. This method collects the full streaming response and returns it as a single AssistantMessage object. For real-time streaming with immediate feedback, use send_message_stream(). Args: prompt: User message (can be empty if only sending tool results) tool_results: Optional tool execution results Returns: Complete AssistantMessage with all content blocks Example: response = await client.send_message("What is 2+2?") print(response.content[0]['text']) """ if not self.options.stream: # Non-streaming mode return await self._send_message_non_stream(prompt, tool_results) # Streaming mode - collect all chunks async for _chunk in self.send_message_stream(prompt, tool_results): pass # Get message from last state message_dict = self.messages[-1] if self.messages else {} # Build AssistantMessage from last message return AssistantMessage( id=str(uuid.uuid4()), # Generate ID for non-streamed type="message", role="assistant", content=message_dict.get("content", []), model=self.options.model, stop_reason="end_turn", ) async def _send_message_non_stream( self, prompt: str, tool_results: list[ToolResult] | None = None, ) -> AssistantMessage: """ Send message without streaming (traditional request-response). This is faster for batch processing but slower for interactive use. """ # Build message content blocks: list[dict[str, Any]] = [] if tool_results: for result in tool_results: blocks.append(result.model_dump()) msg_content: str | list[dict[str, Any]] if prompt: if blocks: blocks.append({"type": "text", "text": prompt}) msg_content = blocks else: msg_content = prompt else: msg_content = blocks user_message: dict[str, Any] = {"role": "user", "content": msg_content} self.messages.append(user_message) request_body = self._build_request_body(stream=False) url = f"{self.BASE_URL}/v1/messages?beta=true" headers = self._build_headers(streaming=False) response = await self.client.post(url, headers=headers, json=request_body) response.raise_for_status() # Parse response data = response.json() assistant_message = AssistantMessage.model_validate(data) # Add to conversation history self.messages.append( { "role": "assistant", "content": data["content"], } ) return assistant_message async def count_tokens(self) -> int: """ Count tokens for current conversation using count_tokens endpoint. Endpoint structure captured from mitmproxy traffic. """ request_body = { "model": self.options.model, "max_tokens": self.options.max_tokens, "messages": self.messages, "system": self._build_system_prompt(), "tools": self._filter_tools(), } url = f"{self.BASE_URL}/v1/messages/count_tokens?beta=true" headers = self._build_headers(streaming=False) response = await self.client.post(url, headers=headers, json=request_body) response.raise_for_status() data = response.json() return data.get("input_tokens", 0) async def close(self): """Close the HTTP client.""" await self.client.aclose() async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def query( prompt: str, options: AgentOptions | None = None, api_key: str | None = None, stream: bool = False, ) -> AssistantMessage: """ Simple query function for one-shot requests. Example: result = await query("Read the file test.txt") print(result.content) """ opts = options or AgentOptions() async with ClaudeAgentClient(api_key=api_key, options=opts) as client: if stream: # Collect streamed response parser = StreamParser() async for chunk in client.send_message_stream(prompt): parser.add_chunk(chunk) message_dict = parser.to_dict() return AssistantMessage.model_validate(message_dict) else: return await client.send_message(prompt)