//! Anthropic Claude driver implementation use async_trait::async_trait; use async_stream::stream; use futures::{Stream, StreamExt}; use secrecy::{ExposeSecret, SecretString}; use reqwest::Client; use serde::{Deserialize, Serialize}; use std::pin::Pin; use zclaw_types::{Result, ZclawError}; use super::{CompletionRequest, CompletionResponse, ContentBlock, LlmDriver, StopReason}; use crate::stream::StreamChunk; /// Anthropic API driver pub struct AnthropicDriver { client: Client, api_key: SecretString, base_url: String, } impl AnthropicDriver { pub fn new(api_key: SecretString) -> Self { Self { client: Client::new(), api_key, base_url: "https://api.anthropic.com".to_string(), } } pub fn with_base_url(api_key: SecretString, base_url: String) -> Self { Self { client: Client::new(), api_key, base_url, } } } #[async_trait] impl LlmDriver for AnthropicDriver { fn provider(&self) -> &str { "anthropic" } fn is_configured(&self) -> bool { !self.api_key.expose_secret().is_empty() } async fn complete(&self, request: CompletionRequest) -> Result { let api_request = self.build_api_request(&request); let response = self.client .post(format!("{}/v1/messages", self.base_url)) .header("x-api-key", self.api_key.expose_secret()) .header("anthropic-version", "2023-06-01") .header("content-type", "application/json") .json(&api_request) .send() .await .map_err(|e| ZclawError::LlmError(format!("HTTP request failed: {}", e)))?; if !response.status().is_success() { let status = response.status(); let body = response.text().await.unwrap_or_default(); return Err(ZclawError::LlmError(format!("API error {}: {}", status, body))); } let api_response: AnthropicResponse = response .json() .await .map_err(|e| ZclawError::LlmError(format!("Failed to parse response: {}", e)))?; Ok(self.convert_response(api_response)) } fn stream( &self, request: CompletionRequest, ) -> Pin> + Send + '_>> { let mut stream_request = self.build_api_request(&request); stream_request.stream = true; let base_url = self.base_url.clone(); let api_key = self.api_key.expose_secret().to_string(); Box::pin(stream! { let response = match self.client .post(format!("{}/v1/messages", base_url)) .header("x-api-key", api_key) .header("anthropic-version", "2023-06-01") .header("content-type", "application/json") .json(&stream_request) .send() .await { Ok(r) => r, Err(e) => { yield Err(ZclawError::LlmError(format!("HTTP request failed: {}", e))); return; } }; if !response.status().is_success() { let status = response.status(); let body = response.text().await.unwrap_or_default(); yield Err(ZclawError::LlmError(format!("API error {}: {}", status, body))); return; } let mut byte_stream = response.bytes_stream(); let mut current_tool_id: Option = None; let mut tool_input_buffer = String::new(); while let Some(chunk_result) = byte_stream.next().await { let chunk = match chunk_result { Ok(c) => c, Err(e) => { yield Err(ZclawError::LlmError(format!("Stream error: {}", e))); continue; } }; let text = String::from_utf8_lossy(&chunk); for line in text.lines() { if let Some(data) = line.strip_prefix("data: ") { if data == "[DONE]" { continue; } match serde_json::from_str::(data) { Ok(event) => { match event.event_type.as_str() { "content_block_delta" => { if let Some(delta) = event.delta { if let Some(text) = delta.text { yield Ok(StreamChunk::TextDelta { delta: text }); } if let Some(thinking) = delta.thinking { yield Ok(StreamChunk::ThinkingDelta { delta: thinking }); } if let Some(json) = delta.partial_json { tool_input_buffer.push_str(&json); } } } "content_block_start" => { if let Some(block) = event.content_block { match block.block_type.as_str() { "tool_use" => { current_tool_id = block.id.clone(); yield Ok(StreamChunk::ToolUseStart { id: block.id.unwrap_or_default(), name: block.name.unwrap_or_default(), }); } _ => {} } } } "content_block_stop" => { if let Some(id) = current_tool_id.take() { let input: serde_json::Value = serde_json::from_str(&tool_input_buffer) .unwrap_or(serde_json::Value::Object(Default::default())); yield Ok(StreamChunk::ToolUseEnd { id, input, }); tool_input_buffer.clear(); } } "message_delta" => { if let Some(msg) = event.message { if msg.stop_reason.is_some() { yield Ok(StreamChunk::Complete { input_tokens: msg.usage.as_ref().map(|u| u.input_tokens).unwrap_or(0), output_tokens: msg.usage.as_ref().map(|u| u.output_tokens).unwrap_or(0), stop_reason: msg.stop_reason.unwrap_or_else(|| "end_turn".to_string()), }); } } } "error" => { let error_msg = serde_json::from_str::(&data) .ok() .and_then(|v| v.get("error").and_then(|e| e.get("message")).and_then(|m| m.as_str().map(String::from))) .unwrap_or_else(|| format!("Stream error: {}", &data[..data.len().min(200)])); yield Ok(StreamChunk::Error { message: error_msg, }); } _ => {} } } Err(e) => { tracing::warn!("Failed to parse SSE event: {} - {}", e, data); } } } } } }) } } impl AnthropicDriver { fn build_api_request(&self, request: &CompletionRequest) -> AnthropicRequest { let messages: Vec = request.messages .iter() .filter_map(|msg| match msg { zclaw_types::Message::User { content } => Some(AnthropicMessage { role: "user".to_string(), content: vec!(ContentBlock::Text { text: content.clone() }), }), zclaw_types::Message::Assistant { content, thinking } => { let mut blocks = Vec::new(); if let Some(think) = thinking { blocks.push(ContentBlock::Thinking { thinking: think.clone() }); } blocks.push(ContentBlock::Text { text: content.clone() }); Some(AnthropicMessage { role: "assistant".to_string(), content: blocks, }) } zclaw_types::Message::ToolUse { id, tool, input } => Some(AnthropicMessage { role: "assistant".to_string(), content: vec![ContentBlock::ToolUse { id: id.clone(), name: tool.to_string(), input: input.clone(), }], }), zclaw_types::Message::ToolResult { tool_call_id: _, tool: _, output, is_error } => { let content = if *is_error { format!("Error: {}", output) } else { output.to_string() }; Some(AnthropicMessage { role: "user".to_string(), content: vec![ContentBlock::Text { text: content }], }) } _ => None, }) .collect(); let tools: Vec = request.tools .iter() .map(|t| AnthropicTool { name: t.name.clone(), description: t.description.clone(), input_schema: t.input_schema.clone(), }) .collect(); let requested_max = request.max_tokens.unwrap_or(4096); let (thinking, budget) = if request.thinking_enabled { let budget = match request.reasoning_effort.as_deref() { Some("low") => 2000, Some("medium") => 10000, Some("high") => 32000, _ => 10000, // default }; (Some(AnthropicThinking { r#type: "enabled".to_string(), budget_tokens: budget, }), budget) } else { (None, 0) }; // When thinking is enabled, max_tokens is the TOTAL budget (thinking + text). // Use the maximum output limit (65536) so thinking can consume whatever it // needs without starving the text response. We only pay for tokens actually // generated, so a high limit costs nothing extra. let effective_max = if budget > 0 { 65536 } else { requested_max }; AnthropicRequest { model: request.model.clone(), max_tokens: effective_max, system: request.system.clone(), messages, tools: if tools.is_empty() { None } else { Some(tools) }, temperature: request.temperature, stop_sequences: if request.stop.is_empty() { None } else { Some(request.stop.clone()) }, stream: request.stream, thinking, } } fn convert_response(&self, api_response: AnthropicResponse) -> CompletionResponse { let content: Vec = api_response.content .into_iter() .map(|block| match block.block_type.as_str() { "text" => ContentBlock::Text { text: block.text.unwrap_or_default() }, "thinking" => ContentBlock::Thinking { thinking: block.thinking.unwrap_or_default() }, "tool_use" => ContentBlock::ToolUse { id: block.id.unwrap_or_default(), name: block.name.unwrap_or_default(), input: block.input.unwrap_or(serde_json::Value::Null), }, _ => ContentBlock::Text { text: String::new() }, }) .collect(); let stop_reason = match api_response.stop_reason.as_deref() { Some("end_turn") => StopReason::EndTurn, Some("max_tokens") => StopReason::MaxTokens, Some("stop_sequence") => StopReason::StopSequence, Some("tool_use") => StopReason::ToolUse, _ => StopReason::EndTurn, }; CompletionResponse { content, model: api_response.model, input_tokens: api_response.usage.input_tokens, output_tokens: api_response.usage.output_tokens, stop_reason, } } } // Anthropic API types #[derive(Serialize)] struct AnthropicRequest { model: String, max_tokens: u32, #[serde(skip_serializing_if = "Option::is_none")] system: Option, messages: Vec, #[serde(skip_serializing_if = "Option::is_none")] tools: Option>, #[serde(skip_serializing_if = "Option::is_none")] temperature: Option, #[serde(skip_serializing_if = "Option::is_none")] stop_sequences: Option>, #[serde(default)] stream: bool, #[serde(skip_serializing_if = "Option::is_none")] thinking: Option, } #[derive(Serialize)] struct AnthropicThinking { r#type: String, budget_tokens: u32, } #[derive(Serialize)] struct AnthropicMessage { role: String, content: Vec, } #[derive(Serialize)] struct AnthropicTool { name: String, description: String, input_schema: serde_json::Value, } #[derive(Deserialize)] struct AnthropicResponse { content: Vec, model: String, stop_reason: Option, usage: AnthropicUsage, } #[derive(Deserialize)] struct AnthropicContentBlock { #[serde(rename = "type")] block_type: String, text: Option, thinking: Option, id: Option, name: Option, input: Option, } #[derive(Deserialize)] struct AnthropicUsage { input_tokens: u32, output_tokens: u32, } // Streaming types /// SSE event from Anthropic API #[derive(Debug, Deserialize)] struct AnthropicStreamEvent { #[serde(rename = "type")] event_type: String, #[serde(default)] #[allow(dead_code)] // Used for deserialization, not accessed index: Option, #[serde(default)] delta: Option, #[serde(default)] content_block: Option, #[serde(default)] message: Option, } #[derive(Debug, Deserialize)] struct AnthropicDelta { #[serde(default)] text: Option, #[serde(default)] thinking: Option, #[serde(default)] partial_json: Option, } #[derive(Debug, Deserialize)] struct AnthropicStreamContentBlock { #[serde(rename = "type")] block_type: String, #[serde(default)] id: Option, #[serde(default)] name: Option, } #[derive(Debug, Deserialize)] struct AnthropicStreamMessage { #[serde(default)] stop_reason: Option, #[serde(default)] usage: Option, } #[derive(Debug, Deserialize)] struct AnthropicStreamUsage { #[serde(default)] input_tokens: u32, #[serde(default)] output_tokens: u32, }