fix(runtime): SSE行缓冲 — 修复glm tool call参数截断丢失
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled

根因: OpenAI driver的SSE解析直接按TCP chunk分行,
当glm的JSON响应被拆成多个TCP包时,SSE data行被截断,
导致tool call arguments丢失(input={})。

修复:
1. 添加pending_line缓冲区,跨chunk累积不完整的SSE行
2. 只处理完整的行(\n结尾),未完成的保留到下次
3. researcher.infer_action()增加更多字段推断(search/keyword/q等)

验证: 99 tests PASS, 160 hands tests PASS
This commit is contained in:
iven
2026-04-22 15:20:23 +08:00
parent bc9537cd80
commit 3cb9709caf
2 changed files with 100 additions and 28 deletions

View File

@@ -306,6 +306,33 @@ impl ResearcherHand {
/// including the enum tag, e.g. sending `{"query": "search terms"}` instead
/// of `{"action": "search", "query": "search terms"}`.
fn infer_action(input: &Value) -> Result<ResearcherAction> {
// Debug: log all keys in the input
let keys: Vec<&str> = input.as_object()
.map(|obj| obj.keys().map(|k| k.as_str()).collect())
.unwrap_or_default();
tracing::warn!(target: "researcher", ?keys, %input, "infer_action examining input");
// Check for action field with wrong value
if let Some(action) = input.get("action").and_then(|v| v.as_str()) {
if action == "search" || action == "report" {
if let Some(query_val) = input.get("query") {
let query = Self::parse_query(query_val);
if !query.query.trim().is_empty() {
return Ok(if action == "report" {
ResearcherAction::Report { query }
} else {
ResearcherAction::Search { query }
});
}
}
}
if action == "fetch" {
if let Some(url) = input.get("url").and_then(|v| v.as_str()) {
return Ok(ResearcherAction::Fetch { url: url.to_string() });
}
}
}
// Has "url" (singular) → fetch
if let Some(url) = input.get("url").and_then(|v| v.as_str()) {
if !url.is_empty() && url.starts_with("http") {
@@ -321,10 +348,47 @@ impl ResearcherHand {
return Ok(ResearcherAction::Summarize { urls: url_list });
}
}
// Has "query" (string or object) → search
// Has "query" → search
if let Some(query_val) = input.get("query") {
let query: ResearchQuery = if query_val.is_string() {
// LLM sent plain string: {"query": "search terms"}
let query = Self::parse_query(query_val);
if !query.query.trim().is_empty() {
return Ok(ResearcherAction::Search { query });
}
}
// Has "search" or "search_query" → search
for key in &["search", "search_query", "keyword", "keywords", "q", "text"] {
if let Some(val) = input.get(key) {
let query = Self::parse_query(val);
if !query.query.trim().is_empty() {
return Ok(ResearcherAction::Search { query });
}
}
}
// Last resort: if any string field looks like a search query
if let Some(obj) = input.as_object() {
for (key, val) in obj {
if let Some(s) = val.as_str() {
if s.len() > 2 && !s.starts_with("http") && key != "action" && key != "engine" {
tracing::warn!(target: "researcher", key = %key, value = %s, "Using fallback field as query");
return Ok(ResearcherAction::Search { query: ResearchQuery {
query: s.to_string(),
engine: SearchEngine::Auto,
depth: ResearchDepth::Standard,
max_results: 10,
include_related: false,
time_limit_secs: 60,
}});
}
}
}
}
Err(zclaw_types::ZclawError::HandError(
"无法识别搜索意图:请提供 query搜索或 url获取网页参数".to_string()
))
}
fn parse_query(query_val: &Value) -> ResearchQuery {
if query_val.is_string() {
ResearchQuery {
query: query_val.as_str().unwrap_or("").to_string(),
engine: SearchEngine::Auto,
@@ -334,9 +398,11 @@ impl ResearcherHand {
time_limit_secs: 60,
}
} else {
// LLM sent object: {"query": {"query": "...", "engine": "..."}}
serde_json::from_value(query_val.clone()).unwrap_or_else(|_| ResearchQuery {
query: query_val.get("query")
.or_else(|| query_val.get("search"))
.or_else(|| query_val.get("q"))
.or_else(|| query_val.get("keyword"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string(),
@@ -346,15 +412,8 @@ impl ResearcherHand {
include_related: false,
time_limit_secs: 60,
})
};
if !query.query.trim().is_empty() {
return Ok(ResearcherAction::Search { query });
}
}
Err(zclaw_types::ZclawError::HandError(
"无法识别搜索意图:请提供 query搜索或 url获取网页参数".to_string()
))
}
/// Execute a web search — route to the configured backend
async fn execute_search(&self, query: &ResearchQuery) -> Result<Vec<SearchResult>> {
@@ -1085,10 +1144,14 @@ impl Hand for ResearcherHand {
}
async fn execute(&self, _context: &HandContext, input: Value) -> Result<HandResult> {
tracing::info!(target: "researcher", input = %input, "Researcher hand received input");
// Try strict deserialization first, then fall back to inference
let action: ResearcherAction = match serde_json::from_value(input.clone()) {
Ok(a) => a,
Err(_) => Self::infer_action(&input)?,
Err(e) => {
tracing::warn!(target: "researcher", error = %e, input = %input, "Strict deserialization failed, trying inference");
Self::infer_action(&input)?
}
};
let start = std::time::Instant::now();

View File

@@ -163,6 +163,7 @@ impl LlmDriver for OpenAiDriver {
let mut current_tool_id: Option<String> = None;
let mut sse_event_count: usize = 0;
let mut raw_bytes_total: usize = 0;
let mut pending_line = String::new(); // Buffer for incomplete SSE lines
while let Some(chunk_result) = byte_stream.next().await {
let chunk = match chunk_result {
@@ -180,13 +181,21 @@ impl LlmDriver for OpenAiDriver {
if raw_bytes_total <= 600 {
tracing::debug!("[OpenAI:stream] RAW chunk ({} bytes): {:?}", text.len(), &text[..text.len().min(500)]);
}
for line in text.lines() {
// Accumulate text and split by lines, handling incomplete last line
pending_line.push_str(&text);
// Extract complete lines (ending with \n), keep the rest pending
let mut complete_lines: Vec<String> = Vec::new();
while let Some(pos) = pending_line.find('\n') {
complete_lines.push(pending_line[..pos].to_string());
pending_line = pending_line[pos + 1..].to_string();
}
for line in complete_lines {
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with(':') {
continue; // Skip empty lines and SSE comments
}
// Handle both "data: " (standard) and "data:" (no space)
let data = if let Some(d) = trimmed.strip_prefix("data: ") {
let data: Option<&str> = if let Some(d) = trimmed.strip_prefix("data: ") {
Some(d)
} else if let Some(d) = trimmed.strip_prefix("data:") {
Some(d.trim_start())