""" Agent wrapper provider implementation. Wraps chat providers to provide agentic capabilities. """ import sys from typing import Any, AsyncIterator from multillm import ( BaseAgentProvider, AgentMessage, AgentOptions, Tool, ProviderError, load_provider_config, merge_config, ) class AgentWrapProvider(BaseAgentProvider): """ Agent wrapper provider that wraps chat providers with agentic capabilities. The model parameter should be the chat provider and model to wrap. For example, when using "agentwrap/google/gemini": - Provider: "agentwrap" - Model: "google/gemini" (passed to this provider) This provider will: 1. Use the specified chat provider internally via chat_complete() 2. Implement tool execution loop 3. Manage conversation history 4. Provide agentic multi-turn interactions Usage: # Via client client = multillm.Client() async for msg in client.agent_run("agentwrap/google/gemini", "Hello"): print(msg) # With tools async for msg in client.agent_run( "agentwrap/openai/gpt-4", "What's 2+2?", tools=[calculate_tool], ): print(msg) """ PROVIDER_NAME = "agentwrap" def __init__(self, config: dict[str, Any] | None = None): super().__init__(config) self._client = None def _get_client(self): """Get or create client instance for making chat API calls.""" if self._client is None: # Import here to avoid circular dependency from multillm import Client self._client = Client() return self._client def _build_options(self, options: AgentOptions | None) -> dict[str, Any]: """Build options dict for wrapped provider.""" if options is None: return {} opts = {} if options.system_prompt: opts["system_prompt"] = options.system_prompt # Merge with extra options (temperature, max_tokens, etc.) if options.extra: opts.update(options.extra) return opts async def _execute_tool( self, tool_call: dict, tools: list[Tool] | None, ) -> dict: """ Execute a tool call and return the result. Args: tool_call: Tool call from chat response (OpenAI format) tools: List of available tools with handlers Returns: Tool result dict with 'content' key """ function_name = tool_call["function"]["name"] function_args = tool_call["function"].get("arguments", {}) # Find the tool with matching name if tools: for tool in tools: if tool.name == function_name: # Execute the tool handler try: result = tool.handler(function_args) # Handle async handlers if hasattr(result, "__await__"): result = await result # Return formatted result return {"content": str(result)} except Exception as e: return { "content": f"Error executing tool: {e}", "is_error": True } # Tool not found or no handlers return { "content": f"Tool '{function_name}' not found", "is_error": True } async def run( self, prompt: str, options: AgentOptions | None = None, tools: list[Tool] | None = None, ) -> AsyncIterator[AgentMessage]: """ Run agentic workflow with the wrapped chat provider. Args: prompt: User message to send options: Agent options (max_turns, system_prompt, etc.) tools: Optional tools the agent can use Yields: AgentMessage objects representing the agent's actions and responses """ # Yield session start message yield AgentMessage( type="system", content="Agentic session started", raw=None, ) # Get wrapped model from config # When client routes "agentwrap/google/gemini", we receive "google/gemini" as model file_config = load_provider_config(self.PROVIDER_NAME) merged_config = merge_config(file_config, self.config, {}) wrapped_model = merged_config.get("wrapped_model") if not wrapped_model: raise ProviderError( "AgentWrap provider requires 'wrapped_model' in config. " "When using via client, the model should be specified as 'agentwrap/provider/model'." ) # Build options for chat API chat_options = self._build_options(options) # Get max turns max_turns = options.max_turns if options and options.max_turns else 10 # Initialize conversation history messages = [] # Add system prompt if provided if options and options.system_prompt: messages.append({ "role": "system", "content": options.system_prompt }) # Add user message messages.append({ "role": "user", "content": prompt }) # Get client client = self._get_client() # Tool execution loop final_text = "" for turn in range(max_turns): try: # Call chat_complete with wrapped model response = await client.chat_complete( wrapped_model, messages, tools=tools, **chat_options ) # Get text from response text = response.choices[0].message.content or "" tool_calls = response.choices[0].message.tool_calls or [] # Add assistant message to history messages.append({ "role": "assistant", "content": text, "tool_calls": tool_calls if tool_calls else None }) # Yield text message if present if text: final_text = text yield AgentMessage( type="text", content=text, raw=response, ) # Check if we're done (no tool calls) if not tool_calls: break # Process tool calls for tool_call in tool_calls: # Yield tool use message yield AgentMessage( type="tool_use", tool_name=tool_call["function"]["name"], tool_input=tool_call["function"].get("arguments", {}), raw=tool_call, ) # Execute tool if handler available tool_result = await self._execute_tool(tool_call, tools) # Yield tool result message yield AgentMessage( type="tool_result", tool_name=tool_call["function"]["name"], tool_result=tool_result["content"], raw=tool_result, ) # Add tool result to message history messages.append({ "role": "tool", "tool_call_id": tool_call["id"], "name": tool_call["function"]["name"], "content": tool_result["content"] }) except Exception as e: # Yield error and stop error_msg = f"Error in agentic loop: {e}" print(f"\n{error_msg}", file=sys.stderr) yield AgentMessage( type="error", content=error_msg, raw=e, ) raise ProviderError(error_msg) from e # Yield final result yield AgentMessage( type="result", content=final_text, raw=None, ) async def run_interactive( self, options: AgentOptions | None = None, tools: list[Tool] | None = None, ): """ Interactive sessions not yet implemented for agentwrap. Use multiple calls to run() instead. """ raise NotImplementedError( "Interactive sessions not yet implemented for agentwrap provider. " "Use multiple calls to run() for multi-turn conversations." )