fix(openai): resolve DashScope/Bailian tool calling 400 errors
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled

- Detect providers that don't support streaming with tools (DashScope, aliyuncs, bigmodel.cn)
- Add stream_from_complete() to use non-streaming mode when tools are present
- Fix convert_response() to prioritize tool_calls over empty content
- Fix ToolUse message JSON serialization (Null -> "{}")
- Skip invalid tool calls with empty names in streaming

Root cause: DashScope Coding Plan API doesn't support stream=true with tools,
causing tool parameters to be lost or malformed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
iven
2026-03-24 21:43:03 +08:00
parent 9981a4674e
commit 0179f947aa
2 changed files with 505 additions and 21 deletions

View File

@@ -25,6 +25,8 @@ impl OpenAiDriver {
client: Client::builder()
.user_agent(crate::USER_AGENT)
.http1_only()
.timeout(std::time::Duration::from_secs(120)) // 2 minute timeout
.connect_timeout(std::time::Duration::from_secs(30)) // 30 second connect timeout
.build()
.unwrap_or_else(|_| Client::new()),
api_key,
@@ -37,6 +39,8 @@ impl OpenAiDriver {
client: Client::builder()
.user_agent(crate::USER_AGENT)
.http1_only()
.timeout(std::time::Duration::from_secs(120)) // 2 minute timeout
.connect_timeout(std::time::Duration::from_secs(30)) // 30 second connect timeout
.build()
.unwrap_or_else(|_| Client::new()),
api_key,
@@ -94,23 +98,54 @@ impl LlmDriver for OpenAiDriver {
&self,
request: CompletionRequest,
) -> Pin<Box<dyn Stream<Item = Result<StreamChunk>> + Send + '_>> {
// Check if we should use non-streaming mode for tool calls
// Some providers don't support streaming with tools:
// - Alibaba DashScope: "tools暂时无法与stream=True同时使用"
// - Zhipu GLM: May have similar limitations
let has_tools = !request.tools.is_empty();
let needs_non_streaming = self.base_url.contains("dashscope") ||
self.base_url.contains("aliyuncs") ||
self.base_url.contains("bigmodel.cn");
eprintln!("[OpenAiDriver:stream] base_url={}, has_tools={}, needs_non_streaming={}",
self.base_url, has_tools, needs_non_streaming);
if has_tools && needs_non_streaming {
eprintln!("[OpenAiDriver:stream] Provider detected that may not support streaming with tools, using non-streaming mode. URL: {}", self.base_url);
// Use non-streaming mode and convert to stream
return self.stream_from_complete(request);
}
let mut stream_request = self.build_api_request(&request);
stream_request.stream = true;
// Debug: log the request details
let url = format!("{}/chat/completions", self.base_url);
let request_body = serde_json::to_string(&stream_request).unwrap_or_default();
tracing::debug!("[OpenAiDriver:stream] Sending request to: {}", url);
tracing::debug!("[OpenAiDriver:stream] Request body length: {} bytes", request_body.len());
tracing::trace!("[OpenAiDriver:stream] Request body: {}", request_body);
let base_url = self.base_url.clone();
let api_key = self.api_key.expose_secret().to_string();
Box::pin(stream! {
tracing::debug!("[OpenAiDriver:stream] Starting HTTP request...");
let response = match self.client
.post(format!("{}/chat/completions", base_url))
.header("Authorization", format!("Bearer {}", api_key))
.header("Content-Type", "application/json")
.timeout(std::time::Duration::from_secs(120)) // 2 minute timeout
.json(&stream_request)
.send()
.await
{
Ok(r) => r,
Ok(r) => {
tracing::debug!("[OpenAiDriver:stream] Got response, status: {}", r.status());
r
},
Err(e) => {
tracing::error!("[OpenAiDriver:stream] HTTP request failed: {:?}", e);
yield Err(ZclawError::LlmError(format!("HTTP request failed: {}", e)));
return;
}
@@ -124,6 +159,8 @@ impl LlmDriver for OpenAiDriver {
}
let mut byte_stream = response.bytes_stream();
let mut accumulated_tool_calls: std::collections::HashMap<String, (String, String)> = std::collections::HashMap::new();
let mut current_tool_id: Option<String> = None;
while let Some(chunk_result) = byte_stream.next().await {
let chunk = match chunk_result {
@@ -138,6 +175,31 @@ impl LlmDriver for OpenAiDriver {
for line in text.lines() {
if let Some(data) = line.strip_prefix("data: ") {
if data == "[DONE]" {
tracing::debug!("[OpenAI] Stream done, accumulated_tool_calls: {:?}", accumulated_tool_calls.len());
// Emit ToolUseEnd for all accumulated tool calls (skip invalid ones with empty name)
for (id, (name, args)) in &accumulated_tool_calls {
// Skip tool calls with empty name - they are invalid
if name.is_empty() {
tracing::warn!("[OpenAI] Skipping invalid tool call with empty name: id={}", id);
continue;
}
tracing::debug!("[OpenAI] Emitting ToolUseEnd: id={}, name={}, args={}", id, name, args);
// Ensure parsed args is always a valid JSON object
let parsed_args: serde_json::Value = if args.is_empty() {
serde_json::json!({})
} else {
serde_json::from_str(args).unwrap_or_else(|e| {
tracing::warn!("[OpenAI] Failed to parse tool args '{}': {}, using empty object", args, e);
serde_json::json!({})
})
};
yield Ok(StreamChunk::ToolUseEnd {
id: id.clone(),
input: parsed_args,
});
}
yield Ok(StreamChunk::Complete {
input_tokens: 0,
output_tokens: 0,
@@ -150,17 +212,65 @@ impl LlmDriver for OpenAiDriver {
Ok(resp) => {
if let Some(choice) = resp.choices.first() {
let delta = &choice.delta;
// Handle text content
if let Some(content) = &delta.content {
yield Ok(StreamChunk::TextDelta { delta: content.clone() });
if !content.is_empty() {
yield Ok(StreamChunk::TextDelta { delta: content.clone() });
}
}
// Handle tool calls
if let Some(tool_calls) = &delta.tool_calls {
tracing::trace!("[OpenAI] Received tool_calls delta: {:?}", tool_calls);
for tc in tool_calls {
// Tool call start - has id and name
if let Some(id) = &tc.id {
// Get function name if available
let name = tc.function.as_ref()
.and_then(|f| f.name.clone())
.unwrap_or_default();
// Only emit ToolUseStart if we have a valid tool name
if !name.is_empty() {
tracing::debug!("[OpenAI] ToolUseStart: id={}, name={}", id, name);
current_tool_id = Some(id.clone());
accumulated_tool_calls.insert(id.clone(), (name.clone(), String::new()));
yield Ok(StreamChunk::ToolUseStart {
id: id.clone(),
name,
});
} else {
tracing::debug!("[OpenAI] Tool call with empty name, waiting for name delta: id={}", id);
// Still track the tool call but don't emit yet
current_tool_id = Some(id.clone());
accumulated_tool_calls.insert(id.clone(), (String::new(), String::new()));
}
}
// Tool call delta - has arguments
if let Some(function) = &tc.function {
tracing::trace!("[OpenAI] Function delta: name={:?}, arguments={:?}", function.name, function.arguments);
if let Some(args) = &function.arguments {
tracing::debug!("[OpenAI] ToolUseDelta: args={}", args);
// Try to find the tool by id or use current
let tool_id = tc.id.as_ref()
.or(current_tool_id.as_ref())
.cloned()
.unwrap_or_default();
yield Ok(StreamChunk::ToolUseDelta {
id: tc.id.clone().unwrap_or_default(),
id: tool_id.clone(),
delta: args.clone(),
});
// Accumulate arguments
if let Some(entry) = accumulated_tool_calls.get_mut(&tool_id) {
tracing::debug!("[OpenAI] Accumulating args for tool {}: '{}' -> '{}'", tool_id, args, entry.1);
entry.1.push_str(args);
} else {
tracing::warn!("[OpenAI] No entry found for tool_id '{}' to accumulate args", tool_id);
}
}
}
}
@@ -168,7 +278,7 @@ impl LlmDriver for OpenAiDriver {
}
}
Err(e) => {
tracing::warn!("Failed to parse OpenAI SSE: {}", e);
tracing::warn!("[OpenAI] Failed to parse SSE: {}, data: {}", e, data);
}
}
}
@@ -212,19 +322,27 @@ impl OpenAiDriver {
content: Some(content.clone()),
tool_calls: None,
}),
zclaw_types::Message::ToolUse { id, tool, input } => Some(OpenAiMessage {
role: "assistant".to_string(),
content: None,
tool_calls: Some(vec![OpenAiToolCall {
id: id.clone(),
r#type: "function".to_string(),
function: FunctionCall {
name: tool.to_string(),
arguments: serde_json::to_string(input).unwrap_or_default(),
},
}]),
}),
zclaw_types::Message::ToolResult { tool_call_id, output, is_error, .. } => Some(OpenAiMessage {
zclaw_types::Message::ToolUse { id, tool, input } => {
// Ensure arguments is always a valid JSON object, never null or invalid
let args = if input.is_null() {
"{}".to_string()
} else {
serde_json::to_string(input).unwrap_or_else(|_| "{}".to_string())
};
Some(OpenAiMessage {
role: "assistant".to_string(),
content: None,
tool_calls: Some(vec![OpenAiToolCall {
id: id.clone(),
r#type: "function".to_string(),
function: FunctionCall {
name: tool.to_string(),
arguments: args,
},
}]),
})
}
zclaw_types::Message::ToolResult { tool_call_id: _, output, is_error, .. } => Some(OpenAiMessage {
role: "tool".to_string(),
content: Some(if *is_error {
format!("Error: {}", output)
@@ -272,17 +390,32 @@ impl OpenAiDriver {
fn convert_response(&self, api_response: OpenAiResponse, model: String) -> CompletionResponse {
let choice = api_response.choices.first();
tracing::debug!("[OpenAiDriver:convert_response] Processing response: {} choices, first choice: {:?}", api_response.choices.len(), choice.map(|c| format!("content={:?}, tool_calls={:?}, finish_reason={:?}", c.message.content, c.message.tool_calls.as_ref().map(|tc| tc.len()), c.finish_reason)));
let (content, stop_reason) = match choice {
Some(c) => {
let blocks = if let Some(text) = &c.message.content {
vec![ContentBlock::Text { text: text.clone() }]
} else if let Some(tool_calls) = &c.message.tool_calls {
// Priority: tool_calls > non-empty content > empty content
// This is important because some providers return empty content with tool_calls
let has_tool_calls = c.message.tool_calls.as_ref().map(|tc| !tc.is_empty()).unwrap_or(false);
let has_content = c.message.content.as_ref().map(|t| !t.is_empty()).unwrap_or(false);
let blocks = if has_tool_calls {
// Tool calls take priority
let tool_calls = c.message.tool_calls.as_ref().unwrap();
tracing::debug!("[OpenAiDriver:convert_response] Using tool_calls: {} calls", tool_calls.len());
tool_calls.iter().map(|tc| ContentBlock::ToolUse {
id: tc.id.clone(),
name: tc.function.name.clone(),
input: serde_json::from_str(&tc.function.arguments).unwrap_or(serde_json::Value::Null),
}).collect()
} else if has_content {
// Non-empty content
let text = c.message.content.as_ref().unwrap();
tracing::debug!("[OpenAiDriver:convert_response] Using text content: {} chars", text.len());
vec![ContentBlock::Text { text: text.clone() }]
} else {
// No content or tool_calls
tracing::debug!("[OpenAiDriver:convert_response] No content or tool_calls, using empty text");
vec![ContentBlock::Text { text: String::new() }]
};
@@ -295,7 +428,10 @@ impl OpenAiDriver {
(blocks, stop)
}
None => (vec![ContentBlock::Text { text: String::new() }], StopReason::EndTurn),
None => {
tracing::debug!("[OpenAiDriver:convert_response] No choices in response");
(vec![ContentBlock::Text { text: String::new() }], StopReason::EndTurn)
}
};
let (input_tokens, output_tokens) = api_response.usage
@@ -310,6 +446,119 @@ impl OpenAiDriver {
stop_reason,
}
}
/// Convert a non-streaming completion to a stream for providers that don't support streaming with tools
fn stream_from_complete(&self, request: CompletionRequest) -> Pin<Box<dyn Stream<Item = Result<StreamChunk>> + Send + '_>> {
// Build non-streaming request
let mut complete_request = self.build_api_request(&request);
complete_request.stream = false;
// Capture values before entering the stream
let base_url = self.base_url.clone();
let api_key = self.api_key.expose_secret().to_string();
let model = request.model.clone();
eprintln!("[OpenAiDriver:stream_from_complete] Starting non-streaming request to: {}/chat/completions", base_url);
Box::pin(stream! {
let url = format!("{}/chat/completions", base_url);
eprintln!("[OpenAiDriver:stream_from_complete] Sending non-streaming request to: {}", url);
let response = match self.client
.post(&url)
.header("Authorization", format!("Bearer {}", api_key))
.header("Content-Type", "application/json")
.timeout(std::time::Duration::from_secs(120))
.json(&complete_request)
.send()
.await
{
Ok(r) => r,
Err(e) => {
yield Err(ZclawError::LlmError(format!("HTTP request failed: {}", e)));
return;
}
};
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
yield Err(ZclawError::LlmError(format!("API error {}: {}", status, body)));
return;
}
let api_response: OpenAiResponse = match response.json().await {
Ok(r) => r,
Err(e) => {
eprintln!("[OpenAiDriver:stream_from_complete] Failed to parse response: {}", e);
yield Err(ZclawError::LlmError(format!("Failed to parse response: {}", e)));
return;
}
};
eprintln!("[OpenAiDriver:stream_from_complete] Got response with {} choices", api_response.choices.len());
if let Some(choice) = api_response.choices.first() {
eprintln!("[OpenAiDriver:stream_from_complete] First choice: content={:?}, tool_calls={:?}, finish_reason={:?}",
choice.message.content.as_ref().map(|c| if c.len() > 100 { &c[..100] } else { c.as_str() }),
choice.message.tool_calls.as_ref().map(|tc| tc.len()),
choice.finish_reason);
}
// Convert response to stream chunks
let completion = self.convert_response(api_response, model.clone());
eprintln!("[OpenAiDriver:stream_from_complete] Converted to {} content blocks, stop_reason: {:?}", completion.content.len(), completion.stop_reason);
// Emit content blocks as stream chunks
for block in &completion.content {
eprintln!("[OpenAiDriver:stream_from_complete] Emitting block: {:?}", block);
match block {
ContentBlock::Text { text } => {
if !text.is_empty() {
eprintln!("[OpenAiDriver:stream_from_complete] Emitting TextDelta: {} chars", text.len());
yield Ok(StreamChunk::TextDelta { delta: text.clone() });
}
}
ContentBlock::Thinking { thinking } => {
yield Ok(StreamChunk::ThinkingDelta { delta: thinking.clone() });
}
ContentBlock::ToolUse { id, name, input } => {
eprintln!("[OpenAiDriver:stream_from_complete] Emitting ToolUse: id={}, name={}", id, name);
// Emit tool use start
yield Ok(StreamChunk::ToolUseStart {
id: id.clone(),
name: name.clone(),
});
// Emit tool use delta with arguments
if !input.is_null() {
let args_str = serde_json::to_string(input).unwrap_or_default();
yield Ok(StreamChunk::ToolUseDelta {
id: id.clone(),
delta: args_str,
});
}
// Emit tool use end
yield Ok(StreamChunk::ToolUseEnd {
id: id.clone(),
input: input.clone(),
});
}
}
}
// Emit completion
yield Ok(StreamChunk::Complete {
input_tokens: completion.input_tokens,
output_tokens: completion.output_tokens,
stop_reason: match completion.stop_reason {
StopReason::EndTurn => "end_turn",
StopReason::MaxTokens => "max_tokens",
StopReason::ToolUse => "tool_use",
StopReason::StopSequence => "stop",
StopReason::Error => "error",
}.to_string(),
});
})
}
}
// OpenAI API types
@@ -460,6 +709,8 @@ struct OpenAiToolCallDelta {
#[derive(Debug, Deserialize)]
struct OpenAiFunctionDelta {
#[serde(default)]
name: Option<String>,
#[serde(default)]
arguments: Option<String>,
}