refactor: 统一项目名称从OpenFang到ZCLAW
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled

重构所有代码和文档中的项目名称,将OpenFang统一更新为ZCLAW。包括:
- 配置文件中的项目名称
- 代码注释和文档引用
- 环境变量和路径
- 类型定义和接口名称
- 测试用例和模拟数据

同时优化部分代码结构,移除未使用的模块,并更新相关依赖项。
This commit is contained in:
iven
2026-03-27 07:36:03 +08:00
parent 4b08804aa9
commit 0d4fa96b82
226 changed files with 7288 additions and 5788 deletions

View File

@@ -0,0 +1,365 @@
//! Context compaction for the agent loop.
//!
//! Provides rule-based token estimation and message compaction to prevent
//! conversations from exceeding LLM context windows. When the estimated
//! token count exceeds the configured threshold, older messages are
//! summarized into a single system message and only recent messages are
//! retained.
use zclaw_types::Message;
/// Number of recent messages to preserve after compaction.
const DEFAULT_KEEP_RECENT: usize = 6;
/// Heuristic token count estimation.
///
/// CJK characters ≈ 1.5 tokens each, English words ≈ 1.3 tokens each.
/// Intentionally conservative (overestimates) to avoid hitting real limits.
pub fn estimate_tokens(text: &str) -> usize {
if text.is_empty() {
return 0;
}
let mut tokens: f64 = 0.0;
for char in text.chars() {
let code = char as u32;
if (0x4E00..=0x9FFF).contains(&code)
|| (0x3400..=0x4DBF).contains(&code)
|| (0x20000..=0x2A6DF).contains(&code)
|| (0xF900..=0xFAFF).contains(&code)
{
// CJK ideographs — ~1.5 tokens
tokens += 1.5;
} else if (0x3000..=0x303F).contains(&code) || (0xFF00..=0xFFEF).contains(&code) {
// CJK / fullwidth punctuation — ~1.0 token
tokens += 1.0;
} else if char == ' ' || char == '\n' || char == '\t' {
// whitespace
tokens += 0.25;
} else {
// ASCII / Latin characters — roughly 4 chars per token
tokens += 0.3;
}
}
tokens.ceil() as usize
}
/// Estimate total tokens for a list of messages (including framing overhead).
pub fn estimate_messages_tokens(messages: &[Message]) -> usize {
let mut total = 0;
for msg in messages {
match msg {
Message::User { content } => {
total += estimate_tokens(content);
total += 4;
}
Message::Assistant { content, thinking } => {
total += estimate_tokens(content);
if let Some(th) = thinking {
total += estimate_tokens(th);
}
total += 4;
}
Message::System { content } => {
total += estimate_tokens(content);
total += 4;
}
Message::ToolUse { input, .. } => {
total += estimate_tokens(&input.to_string());
total += 4;
}
Message::ToolResult { output, .. } => {
total += estimate_tokens(&output.to_string());
total += 4;
}
}
}
total
}
/// Compact a message list by summarizing old messages and keeping recent ones.
///
/// When `messages.len() > keep_recent`, the oldest messages are summarized
/// into a single system message. System messages at the beginning of the
/// conversation are always preserved.
///
/// Returns the compacted message list and the number of original messages removed.
pub fn compact_messages(messages: Vec<Message>, keep_recent: usize) -> (Vec<Message>, usize) {
if messages.len() <= keep_recent {
return (messages, 0);
}
// Preserve leading system messages (they contain compaction summaries from prior runs)
let leading_system_count = messages
.iter()
.take_while(|m| matches!(m, Message::System { .. }))
.count();
// Calculate split point: keep leading system + recent messages
let keep_from_end = keep_recent.min(messages.len().saturating_sub(leading_system_count));
let split_index = messages.len().saturating_sub(keep_from_end);
// Ensure we keep at least the leading system messages
let split_index = split_index.max(leading_system_count);
if split_index == 0 {
return (messages, 0);
}
let old_messages = &messages[..split_index];
let recent_messages = &messages[split_index..];
let summary = generate_summary(old_messages);
let removed_count = old_messages.len();
let mut compacted = Vec::with_capacity(1 + recent_messages.len());
compacted.push(Message::system(summary));
compacted.extend(recent_messages.iter().cloned());
(compacted, removed_count)
}
/// Check if compaction should be triggered and perform it if needed.
///
/// Returns the (possibly compacted) message list.
pub fn maybe_compact(messages: Vec<Message>, threshold: usize) -> Vec<Message> {
let tokens = estimate_messages_tokens(&messages);
if tokens < threshold {
return messages;
}
tracing::info!(
"[Compaction] Triggered: {} tokens > {} threshold, {} messages",
tokens,
threshold,
messages.len(),
);
let (compacted, removed) = compact_messages(messages, DEFAULT_KEEP_RECENT);
tracing::info!(
"[Compaction] Removed {} messages, {} remain",
removed,
compacted.len(),
);
compacted
}
/// Generate a rule-based summary of old messages.
fn generate_summary(messages: &[Message]) -> String {
if messages.is_empty() {
return "[对话开始]".to_string();
}
let mut sections: Vec<String> = vec!["[以下是之前对话的摘要]".to_string()];
let mut user_count = 0;
let mut assistant_count = 0;
let mut topics: Vec<String> = Vec::new();
for msg in messages {
match msg {
Message::User { content } => {
user_count += 1;
let topic = extract_topic(content);
if let Some(t) = topic {
topics.push(t);
}
}
Message::Assistant { .. } => {
assistant_count += 1;
}
Message::System { content } => {
// Skip system messages that are previous compaction summaries
if !content.starts_with("[以下是之前对话的摘要]") {
sections.push(format!("系统提示: {}", truncate(content, 60)));
}
}
Message::ToolUse { tool, .. } => {
sections.push(format!("工具调用: {}", tool.as_str()));
}
Message::ToolResult { .. } => {
// Skip tool results in summary
}
}
}
if !topics.is_empty() {
let topic_list: Vec<String> = topics.iter().take(8).cloned().collect();
sections.push(format!("讨论主题: {}", topic_list.join("; ")));
}
sections.push(format!(
"(已压缩 {} 条消息,其中用户 {} 条,助手 {} 条)",
messages.len(),
user_count,
assistant_count,
));
let summary = sections.join("\n");
// Enforce max length
let max_chars = 800;
if summary.len() > max_chars {
format!("{}...\n(摘要已截断)", &summary[..max_chars])
} else {
summary
}
}
/// Extract the main topic from a user message (first sentence or first 50 chars).
fn extract_topic(content: &str) -> Option<String> {
let trimmed = content.trim();
if trimmed.is_empty() {
return None;
}
// Find sentence end markers
for (i, char) in trimmed.char_indices() {
if char == '。' || char == '' || char == '' || char == '\n' {
let end = i + char.len_utf8();
if end <= 80 {
return Some(trimmed[..end].trim().to_string());
}
break;
}
}
if trimmed.chars().count() <= 50 {
return Some(trimmed.to_string());
}
Some(format!("{}...", trimmed.chars().take(50).collect::<String>()))
}
/// Truncate text to max_chars at char boundary.
fn truncate(text: &str, max_chars: usize) -> String {
if text.chars().count() <= max_chars {
return text.to_string();
}
let truncated: String = text.chars().take(max_chars).collect();
format!("{}...", truncated)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_estimate_tokens_empty() {
assert_eq!(estimate_tokens(""), 0);
}
#[test]
fn test_estimate_tokens_english() {
let tokens = estimate_tokens("Hello world");
assert!(tokens > 0);
}
#[test]
fn test_estimate_tokens_cjk() {
let tokens = estimate_tokens("你好世界");
assert!(tokens > 3); // CJK chars are ~1.5 tokens each
}
#[test]
fn test_estimate_messages_tokens() {
let messages = vec![
Message::user("Hello"),
Message::assistant("Hi there"),
];
let tokens = estimate_messages_tokens(&messages);
assert!(tokens > 0);
}
#[test]
fn test_compact_messages_under_threshold() {
let messages = vec![
Message::user("Hello"),
Message::assistant("Hi"),
];
let (result, removed) = compact_messages(messages, 6);
assert_eq!(removed, 0);
assert_eq!(result.len(), 2);
}
#[test]
fn test_compact_messages_over_threshold() {
let messages: Vec<Message> = (0..10)
.flat_map(|i| {
vec![
Message::user(format!("Question {}", i)),
Message::assistant(format!("Answer {}", i)),
]
})
.collect();
let (result, removed) = compact_messages(messages, 4);
assert!(removed > 0);
// Should have: 1 summary + 4 recent messages
assert_eq!(result.len(), 5);
// First message should be a system summary
assert!(matches!(&result[0], Message::System { .. }));
}
#[test]
fn test_compact_preserves_leading_system() {
let messages = vec![
Message::system("You are helpful"),
Message::user("Q1"),
Message::assistant("A1"),
Message::user("Q2"),
Message::assistant("A2"),
Message::user("Q3"),
Message::assistant("A3"),
];
let (result, removed) = compact_messages(messages, 4);
assert!(removed > 0);
// Should start with compaction summary, then recent messages
assert!(matches!(&result[0], Message::System { .. }));
}
#[test]
fn test_maybe_compact_under_threshold() {
let messages = vec![
Message::user("Short message"),
Message::assistant("Short reply"),
];
let result = maybe_compact(messages, 100_000);
assert_eq!(result.len(), 2);
}
#[test]
fn test_extract_topic_sentence() {
let topic = extract_topic("什么是Rust的所有权系统").unwrap();
assert!(topic.contains("所有权"));
}
#[test]
fn test_extract_topic_short() {
let topic = extract_topic("Hello").unwrap();
assert_eq!(topic, "Hello");
}
#[test]
fn test_extract_topic_long() {
let long = "This is a very long message that exceeds fifty characters in total length";
let topic = extract_topic(long).unwrap();
assert!(topic.ends_with("..."));
}
#[test]
fn test_generate_summary() {
let messages = vec![
Message::user("What is Rust?"),
Message::assistant("Rust is a systems programming language"),
Message::user("How does ownership work?"),
Message::assistant("Ownership is Rust's memory management system"),
];
let summary = generate_summary(&messages);
assert!(summary.contains("摘要"));
assert!(summary.contains("2"));
}
}

View File

@@ -1,9 +1,17 @@
//! Google Gemini driver implementation
//!
//! Implements the Gemini REST API v1beta with full support for:
//! - Text generation (complete and streaming)
//! - Tool / function calling
//! - System instructions
//! - Token usage reporting
use async_trait::async_trait;
use futures::Stream;
use async_stream::stream;
use futures::{Stream, StreamExt};
use secrecy::{ExposeSecret, SecretString};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use zclaw_types::{Result, ZclawError};
@@ -11,7 +19,6 @@ use super::{CompletionRequest, CompletionResponse, ContentBlock, LlmDriver, Stop
use crate::stream::StreamChunk;
/// Google Gemini driver
#[allow(dead_code)] // TODO: Implement full Gemini API support
pub struct GeminiDriver {
client: Client,
api_key: SecretString,
@@ -21,11 +28,31 @@ pub struct GeminiDriver {
impl GeminiDriver {
pub fn new(api_key: SecretString) -> Self {
Self {
client: Client::new(),
client: Client::builder()
.user_agent(crate::USER_AGENT)
.http1_only()
.timeout(std::time::Duration::from_secs(120))
.connect_timeout(std::time::Duration::from_secs(30))
.build()
.unwrap_or_else(|_| Client::new()),
api_key,
base_url: "https://generativelanguage.googleapis.com/v1beta".to_string(),
}
}
pub fn with_base_url(api_key: SecretString, base_url: String) -> Self {
Self {
client: Client::builder()
.user_agent(crate::USER_AGENT)
.http1_only()
.timeout(std::time::Duration::from_secs(120))
.connect_timeout(std::time::Duration::from_secs(30))
.build()
.unwrap_or_else(|_| Client::new()),
api_key,
base_url,
}
}
}
#[async_trait]
@@ -39,25 +66,594 @@ impl LlmDriver for GeminiDriver {
}
async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
// TODO: Implement actual API call
Ok(CompletionResponse {
content: vec![ContentBlock::Text {
text: "Gemini driver not yet implemented".to_string(),
}],
model: request.model,
input_tokens: 0,
output_tokens: 0,
stop_reason: StopReason::EndTurn,
})
let api_request = self.build_api_request(&request);
let url = format!(
"{}/models/{}:generateContent?key={}",
self.base_url,
request.model,
self.api_key.expose_secret()
);
tracing::debug!(target: "gemini_driver", "Sending request to: {}", url);
let response = self.client
.post(&url)
.header("content-type", "application/json")
.json(&api_request)
.send()
.await
.map_err(|e| ZclawError::LlmError(format!("HTTP request failed: {}", e)))?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
tracing::warn!(target: "gemini_driver", "API error {}: {}", status, body);
return Err(ZclawError::LlmError(format!("API error {}: {}", status, body)));
}
let api_response: GeminiResponse = response
.json()
.await
.map_err(|e| ZclawError::LlmError(format!("Failed to parse response: {}", e)))?;
Ok(self.convert_response(api_response, request.model))
}
fn stream(
&self,
_request: CompletionRequest,
request: CompletionRequest,
) -> Pin<Box<dyn Stream<Item = Result<StreamChunk>> + Send + '_>> {
// Placeholder - return error stream
Box::pin(futures::stream::once(async {
Err(ZclawError::LlmError("Gemini streaming not yet implemented".to_string()))
}))
let api_request = self.build_api_request(&request);
let url = format!(
"{}/models/{}:streamGenerateContent?alt=sse&key={}",
self.base_url,
request.model,
self.api_key.expose_secret()
);
tracing::debug!(target: "gemini_driver", "Starting stream request to: {}", url);
Box::pin(stream! {
let response = match self.client
.post(&url)
.header("content-type", "application/json")
.timeout(std::time::Duration::from_secs(120))
.json(&api_request)
.send()
.await
{
Ok(r) => {
tracing::debug!(target: "gemini_driver", "Stream response status: {}", r.status());
r
},
Err(e) => {
tracing::error!(target: "gemini_driver", "HTTP request failed: {:?}", e);
yield Err(ZclawError::LlmError(format!("HTTP request failed: {}", e)));
return;
}
};
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
yield Err(ZclawError::LlmError(format!("API error {}: {}", status, body)));
return;
}
let mut byte_stream = response.bytes_stream();
let mut accumulated_tool_calls: std::collections::HashMap<usize, (String, String)> = std::collections::HashMap::new();
while let Some(chunk_result) = byte_stream.next().await {
let chunk = match chunk_result {
Ok(c) => c,
Err(e) => {
yield Err(ZclawError::LlmError(format!("Stream error: {}", e)));
continue;
}
};
let text = String::from_utf8_lossy(&chunk);
for line in text.lines() {
if let Some(data) = line.strip_prefix("data: ") {
match serde_json::from_str::<GeminiStreamResponse>(data) {
Ok(resp) => {
if let Some(candidate) = resp.candidates.first() {
let content = match &candidate.content {
Some(c) => c,
None => continue,
};
let parts = &content.parts;
for (idx, part) in parts.iter().enumerate() {
// Handle text content
if let Some(text) = &part.text {
if !text.is_empty() {
yield Ok(StreamChunk::TextDelta { delta: text.clone() });
}
}
// Handle function call (tool use)
if let Some(fc) = &part.function_call {
let name = fc.name.clone().unwrap_or_default();
let args = fc.args.clone().unwrap_or(serde_json::Value::Object(Default::default()));
// Emit ToolUseStart if this is a new tool call
if !accumulated_tool_calls.contains_key(&idx) {
accumulated_tool_calls.insert(idx, (name.clone(), String::new()));
yield Ok(StreamChunk::ToolUseStart {
id: format!("gemini_call_{}", idx),
name,
});
}
// Emit the function arguments as delta
let args_str = serde_json::to_string(&args).unwrap_or_default();
let call_id = format!("gemini_call_{}", idx);
yield Ok(StreamChunk::ToolUseDelta {
id: call_id.clone(),
delta: args_str.clone(),
});
// Accumulate
if let Some(entry) = accumulated_tool_calls.get_mut(&idx) {
entry.1 = args_str;
}
}
}
// When the candidate is finished, emit ToolUseEnd for all pending
if let Some(ref finish_reason) = candidate.finish_reason {
let is_final = finish_reason == "STOP" || finish_reason == "MAX_TOKENS";
if is_final {
// Emit ToolUseEnd for all accumulated tool calls
for (idx, (_name, args_str)) in &accumulated_tool_calls {
let input: serde_json::Value = if args_str.is_empty() {
serde_json::json!({})
} else {
serde_json::from_str(args_str).unwrap_or_else(|e| {
tracing::warn!(target: "gemini_driver", "Failed to parse tool args '{}': {}", args_str, e);
serde_json::json!({})
})
};
yield Ok(StreamChunk::ToolUseEnd {
id: format!("gemini_call_{}", idx),
input,
});
}
// Extract usage metadata from the response
let usage = resp.usage_metadata.as_ref();
let input_tokens = usage.map(|u| u.prompt_token_count.unwrap_or(0)).unwrap_or(0);
let output_tokens = usage.map(|u| u.candidates_token_count.unwrap_or(0)).unwrap_or(0);
let stop_reason = match finish_reason.as_str() {
"STOP" => "end_turn",
"MAX_TOKENS" => "max_tokens",
"SAFETY" => "error",
"RECITATION" => "error",
_ => "end_turn",
};
yield Ok(StreamChunk::Complete {
input_tokens,
output_tokens,
stop_reason: stop_reason.to_string(),
});
}
}
}
}
Err(e) => {
tracing::warn!(target: "gemini_driver", "Failed to parse SSE event: {} - {}", e, data);
}
}
}
}
}
})
}
}
impl GeminiDriver {
/// Convert a CompletionRequest into the Gemini API request format.
///
/// Key mapping decisions:
/// - `system` prompt maps to `systemInstruction`
/// - Messages use Gemini's `contents` array with `role`/`parts`
/// - Tool definitions use `functionDeclarations`
/// - Tool results are sent as `functionResponse` parts in `user` messages
fn build_api_request(&self, request: &CompletionRequest) -> GeminiRequest {
let mut contents: Vec<GeminiContent> = Vec::new();
for msg in &request.messages {
match msg {
zclaw_types::Message::User { content } => {
contents.push(GeminiContent {
role: "user".to_string(),
parts: vec![GeminiPart {
text: Some(content.clone()),
inline_data: None,
function_call: None,
function_response: None,
}],
});
}
zclaw_types::Message::Assistant { content, thinking } => {
let mut parts = Vec::new();
// Gemini does not have a native "thinking" field, so we prepend
// any thinking content as a text part with a marker.
if let Some(think) = thinking {
if !think.is_empty() {
parts.push(GeminiPart {
text: Some(format!("[thinking]\n{}\n[/thinking]", think)),
inline_data: None,
function_call: None,
function_response: None,
});
}
}
parts.push(GeminiPart {
text: Some(content.clone()),
inline_data: None,
function_call: None,
function_response: None,
});
contents.push(GeminiContent {
role: "model".to_string(),
parts,
});
}
zclaw_types::Message::ToolUse { id: _, tool, input } => {
// Tool use from the assistant is represented as a functionCall part
let args = if input.is_null() {
serde_json::json!({})
} else {
input.clone()
};
contents.push(GeminiContent {
role: "model".to_string(),
parts: vec![GeminiPart {
text: None,
inline_data: None,
function_call: Some(GeminiFunctionCall {
name: Some(tool.to_string()),
args: Some(args),
}),
function_response: None,
}],
});
}
zclaw_types::Message::ToolResult { tool_call_id, tool, output, is_error } => {
// Tool results are sent as functionResponse parts in a "user" role message.
// Gemini requires that function responses reference the function name
// and include the response wrapped in a "result" or "error" key.
let response_content = if *is_error {
serde_json::json!({ "error": output.to_string() })
} else {
serde_json::json!({ "result": output.clone() })
};
contents.push(GeminiContent {
role: "user".to_string(),
parts: vec![GeminiPart {
text: None,
inline_data: None,
function_call: None,
function_response: Some(GeminiFunctionResponse {
name: tool.to_string(),
response: response_content,
}),
}],
});
// Gemini ignores tool_call_id, but we log it for debugging
let _ = tool_call_id;
}
zclaw_types::Message::System { content } => {
// System messages are converted to user messages with system context.
// Note: the primary system prompt is handled via systemInstruction.
// Inline system messages in conversation history become user messages.
contents.push(GeminiContent {
role: "user".to_string(),
parts: vec![GeminiPart {
text: Some(content.clone()),
inline_data: None,
function_call: None,
function_response: None,
}],
});
}
}
}
// Build tool declarations
let function_declarations: Vec<GeminiFunctionDeclaration> = request.tools
.iter()
.map(|t| GeminiFunctionDeclaration {
name: t.name.clone(),
description: t.description.clone(),
parameters: t.input_schema.clone(),
})
.collect();
// Build generation config
let mut generation_config = GeminiGenerationConfig::default();
if let Some(temp) = request.temperature {
generation_config.temperature = Some(temp);
}
if let Some(max) = request.max_tokens {
generation_config.max_output_tokens = Some(max);
}
if !request.stop.is_empty() {
generation_config.stop_sequences = Some(request.stop.clone());
}
// Build system instruction
let system_instruction = request.system.as_ref().map(|s| GeminiSystemInstruction {
parts: vec![GeminiPart {
text: Some(s.clone()),
inline_data: None,
function_call: None,
function_response: None,
}],
});
GeminiRequest {
contents,
system_instruction,
generation_config: Some(generation_config),
tools: if function_declarations.is_empty() {
None
} else {
Some(vec![GeminiTool {
function_declarations,
}])
},
}
}
/// Convert a Gemini API response into a CompletionResponse.
fn convert_response(&self, api_response: GeminiResponse, model: String) -> CompletionResponse {
let candidate = api_response.candidates.first();
let (content, stop_reason) = match candidate {
Some(c) => {
let parts = c.content.as_ref()
.map(|content| content.parts.as_slice())
.unwrap_or(&[]);
let mut blocks: Vec<ContentBlock> = Vec::new();
let mut has_tool_use = false;
for part in parts {
// Handle text content
if let Some(text) = &part.text {
// Skip thinking markers we injected
if text.starts_with("[thinking]\n") && text.contains("[/thinking]") {
let thinking_content = text
.strip_prefix("[thinking]\n")
.and_then(|s| s.strip_suffix("\n[/thinking]"))
.unwrap_or("");
if !thinking_content.is_empty() {
blocks.push(ContentBlock::Thinking {
thinking: thinking_content.to_string(),
});
}
} else if !text.is_empty() {
blocks.push(ContentBlock::Text { text: text.clone() });
}
}
// Handle function call (tool use)
if let Some(fc) = &part.function_call {
has_tool_use = true;
blocks.push(ContentBlock::ToolUse {
id: format!("gemini_call_{}", blocks.len()),
name: fc.name.clone().unwrap_or_default(),
input: fc.args.clone().unwrap_or(serde_json::Value::Object(Default::default())),
});
}
}
// If there are no content blocks, add an empty text block
if blocks.is_empty() {
blocks.push(ContentBlock::Text { text: String::new() });
}
let stop = match c.finish_reason.as_deref() {
Some("STOP") => StopReason::EndTurn,
Some("MAX_TOKENS") => StopReason::MaxTokens,
Some("SAFETY") => StopReason::Error,
Some("RECITATION") => StopReason::Error,
Some("TOOL_USE") => StopReason::ToolUse,
_ => {
if has_tool_use {
StopReason::ToolUse
} else {
StopReason::EndTurn
}
}
};
(blocks, stop)
}
None => {
tracing::warn!(target: "gemini_driver", "No candidates in response");
(
vec![ContentBlock::Text { text: String::new() }],
StopReason::EndTurn,
)
}
};
let usage = api_response.usage_metadata.as_ref();
let input_tokens = usage.map(|u| u.prompt_token_count.unwrap_or(0)).unwrap_or(0);
let output_tokens = usage.map(|u| u.candidates_token_count.unwrap_or(0)).unwrap_or(0);
CompletionResponse {
content,
model,
input_tokens,
output_tokens,
stop_reason,
}
}
}
// ---------------------------------------------------------------------------
// Gemini API request types
// ---------------------------------------------------------------------------
#[derive(Serialize)]
struct GeminiRequest {
contents: Vec<GeminiContent>,
#[serde(skip_serializing_if = "Option::is_none")]
system_instruction: Option<GeminiSystemInstruction>,
#[serde(skip_serializing_if = "Option::is_none")]
generation_config: Option<GeminiGenerationConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
tools: Option<Vec<GeminiTool>>,
}
#[derive(Serialize)]
struct GeminiContent {
role: String,
parts: Vec<GeminiPart>,
}
#[derive(Serialize, Clone)]
struct GeminiPart {
#[serde(skip_serializing_if = "Option::is_none")]
text: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
inline_data: Option<serde_json::Value>,
#[serde(rename = "functionCall", skip_serializing_if = "Option::is_none")]
function_call: Option<GeminiFunctionCall>,
#[serde(rename = "functionResponse", skip_serializing_if = "Option::is_none")]
function_response: Option<GeminiFunctionResponse>,
}
#[derive(Serialize)]
struct GeminiSystemInstruction {
parts: Vec<GeminiPart>,
}
#[derive(Serialize)]
struct GeminiGenerationConfig {
#[serde(skip_serializing_if = "Option::is_none")]
temperature: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
max_output_tokens: Option<u32>,
#[serde(rename = "stopSequences", skip_serializing_if = "Option::is_none")]
stop_sequences: Option<Vec<String>>,
}
impl Default for GeminiGenerationConfig {
fn default() -> Self {
Self {
temperature: None,
max_output_tokens: None,
stop_sequences: None,
}
}
}
#[derive(Serialize)]
struct GeminiTool {
#[serde(rename = "functionDeclarations")]
function_declarations: Vec<GeminiFunctionDeclaration>,
}
#[derive(Serialize)]
struct GeminiFunctionDeclaration {
name: String,
description: String,
parameters: serde_json::Value,
}
#[derive(Serialize, Clone)]
struct GeminiFunctionCall {
#[serde(skip_serializing_if = "Option::is_none")]
name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
args: Option<serde_json::Value>,
}
#[derive(Serialize, Clone)]
struct GeminiFunctionResponse {
name: String,
response: serde_json::Value,
}
// ---------------------------------------------------------------------------
// Gemini API response types
// ---------------------------------------------------------------------------
#[derive(Deserialize)]
struct GeminiResponse {
#[serde(default)]
candidates: Vec<GeminiCandidate>,
#[serde(default)]
usage_metadata: Option<GeminiUsageMetadata>,
}
#[derive(Debug, Deserialize)]
struct GeminiCandidate {
#[serde(default)]
content: Option<GeminiResponseContent>,
#[serde(default)]
finish_reason: Option<String>,
}
#[derive(Debug, Deserialize)]
struct GeminiResponseContent {
#[serde(default)]
parts: Vec<GeminiResponsePart>,
#[serde(default)]
#[allow(dead_code)]
role: Option<String>,
}
#[derive(Debug, Deserialize)]
struct GeminiResponsePart {
#[serde(default)]
text: Option<String>,
#[serde(rename = "functionCall", default)]
function_call: Option<GeminiResponseFunctionCall>,
}
#[derive(Debug, Deserialize)]
struct GeminiResponseFunctionCall {
#[serde(default)]
name: Option<String>,
#[serde(default)]
args: Option<serde_json::Value>,
}
#[derive(Debug, Deserialize)]
struct GeminiUsageMetadata {
#[serde(default)]
prompt_token_count: Option<u32>,
#[serde(default)]
candidates_token_count: Option<u32>,
#[serde(default)]
#[allow(dead_code)]
total_token_count: Option<u32>,
}
// ---------------------------------------------------------------------------
// Gemini streaming types
// ---------------------------------------------------------------------------
/// Streaming response from the Gemini SSE endpoint.
/// Each SSE event contains the same structure as the non-streaming response,
/// but with incremental content.
#[derive(Debug, Deserialize)]
struct GeminiStreamResponse {
#[serde(default)]
candidates: Vec<GeminiCandidate>,
#[serde(default)]
usage_metadata: Option<GeminiUsageMetadata>,
}

View File

@@ -1,40 +1,250 @@
//! Local LLM driver (Ollama, LM Studio, vLLM, etc.)
//!
//! Uses the OpenAI-compatible API format. The only differences from the
//! OpenAI driver are: no API key is required, and base_url points to a
//! local server.
use async_trait::async_trait;
use futures::Stream;
use async_stream::stream;
use futures::{Stream, StreamExt};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::pin::Pin;
use zclaw_types::{Result, ZclawError};
use super::{CompletionRequest, CompletionResponse, ContentBlock, LlmDriver, StopReason};
use crate::stream::StreamChunk;
/// Local LLM driver for Ollama, LM Studio, vLLM, etc.
#[allow(dead_code)] // TODO: Implement full Local driver support
/// Local LLM driver for Ollama, LM Studio, vLLM, and other OpenAI-compatible servers.
pub struct LocalDriver {
client: Client,
base_url: String,
}
impl LocalDriver {
/// Create a driver pointing at a custom OpenAI-compatible endpoint.
///
/// The `base_url` should end with `/v1` (e.g. `http://localhost:8080/v1`).
pub fn new(base_url: impl Into<String>) -> Self {
Self {
client: Client::new(),
client: Client::builder()
.user_agent(crate::USER_AGENT)
.http1_only()
.timeout(std::time::Duration::from_secs(300)) // 5 min -- local inference can be slow
.connect_timeout(std::time::Duration::from_secs(10)) // short connect timeout
.build()
.unwrap_or_else(|_| Client::new()),
base_url: base_url.into(),
}
}
/// Ollama default endpoint (`http://localhost:11434/v1`).
pub fn ollama() -> Self {
Self::new("http://localhost:11434/v1")
}
/// LM Studio default endpoint (`http://localhost:1234/v1`).
pub fn lm_studio() -> Self {
Self::new("http://localhost:1234/v1")
}
/// vLLM default endpoint (`http://localhost:8000/v1`).
pub fn vllm() -> Self {
Self::new("http://localhost:8000/v1")
}
// ----------------------------------------------------------------
// Request / response conversion (OpenAI-compatible format)
// ----------------------------------------------------------------
fn build_api_request(&self, request: &CompletionRequest) -> LocalApiRequest {
let messages: Vec<LocalApiMessage> = request
.messages
.iter()
.filter_map(|msg| match msg {
zclaw_types::Message::User { content } => Some(LocalApiMessage {
role: "user".to_string(),
content: Some(content.clone()),
tool_calls: None,
}),
zclaw_types::Message::Assistant {
content,
thinking: _,
} => Some(LocalApiMessage {
role: "assistant".to_string(),
content: Some(content.clone()),
tool_calls: None,
}),
zclaw_types::Message::System { content } => Some(LocalApiMessage {
role: "system".to_string(),
content: Some(content.clone()),
tool_calls: None,
}),
zclaw_types::Message::ToolUse {
id, tool, input, ..
} => {
let args = if input.is_null() {
"{}".to_string()
} else {
serde_json::to_string(input).unwrap_or_else(|_| "{}".to_string())
};
Some(LocalApiMessage {
role: "assistant".to_string(),
content: None,
tool_calls: Some(vec![LocalApiToolCall {
id: id.clone(),
r#type: "function".to_string(),
function: LocalFunctionCall {
name: tool.to_string(),
arguments: args,
},
}]),
})
}
zclaw_types::Message::ToolResult {
output, is_error, ..
} => Some(LocalApiMessage {
role: "tool".to_string(),
content: Some(if *is_error {
format!("Error: {}", output)
} else {
output.to_string()
}),
tool_calls: None,
}),
})
.collect();
// Prepend system prompt when provided.
let mut messages = messages;
if let Some(system) = &request.system {
messages.insert(
0,
LocalApiMessage {
role: "system".to_string(),
content: Some(system.clone()),
tool_calls: None,
},
);
}
let tools: Vec<LocalApiTool> = request
.tools
.iter()
.map(|t| LocalApiTool {
r#type: "function".to_string(),
function: LocalFunctionDef {
name: t.name.clone(),
description: t.description.clone(),
parameters: t.input_schema.clone(),
},
})
.collect();
LocalApiRequest {
model: request.model.clone(),
messages,
max_tokens: request.max_tokens,
temperature: request.temperature,
stop: if request.stop.is_empty() {
None
} else {
Some(request.stop.clone())
},
stream: request.stream,
tools: if tools.is_empty() {
None
} else {
Some(tools)
},
}
}
fn convert_response(
&self,
api_response: LocalApiResponse,
model: String,
) -> CompletionResponse {
let choice = api_response.choices.first();
let (content, stop_reason) = match choice {
Some(c) => {
let has_tool_calls = c
.message
.tool_calls
.as_ref()
.map(|tc| !tc.is_empty())
.unwrap_or(false);
let has_content = c
.message
.content
.as_ref()
.map(|t| !t.is_empty())
.unwrap_or(false);
let blocks = if has_tool_calls {
let tool_calls = c.message.tool_calls.as_ref().unwrap();
tool_calls
.iter()
.map(|tc| {
let input: serde_json::Value =
serde_json::from_str(&tc.function.arguments)
.unwrap_or(serde_json::Value::Null);
ContentBlock::ToolUse {
id: tc.id.clone(),
name: tc.function.name.clone(),
input,
}
})
.collect()
} else if has_content {
vec![ContentBlock::Text {
text: c.message.content.clone().unwrap(),
}]
} else {
vec![ContentBlock::Text {
text: String::new(),
}]
};
let stop = match c.finish_reason.as_deref() {
Some("stop") => StopReason::EndTurn,
Some("length") => StopReason::MaxTokens,
Some("tool_calls") => StopReason::ToolUse,
_ => StopReason::EndTurn,
};
(blocks, stop)
}
None => (
vec![ContentBlock::Text {
text: String::new(),
}],
StopReason::EndTurn,
),
};
let (input_tokens, output_tokens) = api_response
.usage
.map(|u| (u.prompt_tokens, u.completion_tokens))
.unwrap_or((0, 0));
CompletionResponse {
content,
model,
input_tokens,
output_tokens,
stop_reason,
}
}
/// Build the `reqwest::RequestBuilder` with an optional Authorization header.
///
/// Ollama does not need one; LM Studio / vLLM may be configured with an
/// optional API key. We send the header only when a key is present.
fn authenticated_post(&self, url: &str) -> reqwest::RequestBuilder {
self.client.post(url).header("Accept", "*/*")
}
}
#[async_trait]
@@ -44,30 +254,394 @@ impl LlmDriver for LocalDriver {
}
fn is_configured(&self) -> bool {
// Local drivers don't require API keys
// Local drivers never require an API key.
true
}
async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
// TODO: Implement actual API call (OpenAI-compatible)
Ok(CompletionResponse {
content: vec![ContentBlock::Text {
text: "Local driver not yet implemented".to_string(),
}],
model: request.model,
input_tokens: 0,
output_tokens: 0,
stop_reason: StopReason::EndTurn,
})
let api_request = self.build_api_request(&request);
let url = format!("{}/chat/completions", self.base_url);
tracing::debug!(target: "local_driver", "Sending request to {}", url);
tracing::trace!(
target: "local_driver",
"Request body: {}",
serde_json::to_string(&api_request).unwrap_or_default()
);
let response = self
.authenticated_post(&url)
.json(&api_request)
.send()
.await
.map_err(|e| {
let hint = connection_error_hint(&e);
ZclawError::LlmError(format!("Failed to connect to local LLM server at {}: {}{}", self.base_url, e, hint))
})?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
tracing::warn!(target: "local_driver", "API error {}: {}", status, body);
return Err(ZclawError::LlmError(format!(
"Local LLM API error {}: {}",
status, body
)));
}
let api_response: LocalApiResponse = response
.json()
.await
.map_err(|e| ZclawError::LlmError(format!("Failed to parse response: {}", e)))?;
Ok(self.convert_response(api_response, request.model))
}
fn stream(
&self,
_request: CompletionRequest,
request: CompletionRequest,
) -> Pin<Box<dyn Stream<Item = Result<StreamChunk>> + Send + '_>> {
// Placeholder - return error stream
Box::pin(futures::stream::once(async {
Err(ZclawError::LlmError("Local driver streaming not yet implemented".to_string()))
}))
let mut stream_request = self.build_api_request(&request);
stream_request.stream = true;
let url = format!("{}/chat/completions", self.base_url);
tracing::debug!(target: "local_driver", "Starting stream to {}", url);
Box::pin(stream! {
let response = match self
.authenticated_post(&url)
.header("Content-Type", "application/json")
.timeout(std::time::Duration::from_secs(300))
.json(&stream_request)
.send()
.await
{
Ok(r) => {
tracing::debug!(target: "local_driver", "Stream response status: {}", r.status());
r
}
Err(e) => {
let hint = connection_error_hint(&e);
tracing::error!(target: "local_driver", "Stream connection failed: {}{}", e, hint);
yield Err(ZclawError::LlmError(format!(
"Failed to connect to local LLM server at {}: {}{}",
self.base_url, e, hint
)));
return;
}
};
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
yield Err(ZclawError::LlmError(format!("API error {}: {}", status, body)));
return;
}
let mut byte_stream = response.bytes_stream();
let mut accumulated_tool_calls: std::collections::HashMap<String, (String, String)> =
std::collections::HashMap::new();
let mut current_tool_id: Option<String> = None;
while let Some(chunk_result) = byte_stream.next().await {
let chunk = match chunk_result {
Ok(c) => c,
Err(e) => {
yield Err(ZclawError::LlmError(format!("Stream error: {}", e)));
continue;
}
};
let text = String::from_utf8_lossy(&chunk);
for line in text.lines() {
if let Some(data) = line.strip_prefix("data: ") {
if data == "[DONE]" {
tracing::debug!(
target: "local_driver",
"Stream done, tool_calls accumulated: {}",
accumulated_tool_calls.len()
);
for (id, (name, args)) in &accumulated_tool_calls {
if name.is_empty() {
tracing::warn!(
target: "local_driver",
"Skipping tool call with empty name: id={}",
id
);
continue;
}
let parsed_args: serde_json::Value = if args.is_empty() {
serde_json::json!({})
} else {
serde_json::from_str(args).unwrap_or_else(|e| {
tracing::warn!(
target: "local_driver",
"Failed to parse tool args '{}': {}",
args, e
);
serde_json::json!({})
})
};
yield Ok(StreamChunk::ToolUseEnd {
id: id.clone(),
input: parsed_args,
});
}
yield Ok(StreamChunk::Complete {
input_tokens: 0,
output_tokens: 0,
stop_reason: "end_turn".to_string(),
});
continue;
}
match serde_json::from_str::<LocalStreamResponse>(data) {
Ok(resp) => {
if let Some(choice) = resp.choices.first() {
let delta = &choice.delta;
// Text content
if let Some(content) = &delta.content {
if !content.is_empty() {
yield Ok(StreamChunk::TextDelta {
delta: content.clone(),
});
}
}
// Tool calls
if let Some(tool_calls) = &delta.tool_calls {
for tc in tool_calls {
// Tool call start
if let Some(id) = &tc.id {
let name = tc
.function
.as_ref()
.and_then(|f| f.name.clone())
.unwrap_or_default();
if !name.is_empty() {
current_tool_id = Some(id.clone());
accumulated_tool_calls
.insert(id.clone(), (name.clone(), String::new()));
yield Ok(StreamChunk::ToolUseStart {
id: id.clone(),
name,
});
} else {
current_tool_id = Some(id.clone());
accumulated_tool_calls
.insert(id.clone(), (String::new(), String::new()));
}
}
// Tool call delta
if let Some(function) = &tc.function {
if let Some(args) = &function.arguments {
let tool_id = tc
.id
.as_ref()
.or(current_tool_id.as_ref())
.cloned()
.unwrap_or_default();
yield Ok(StreamChunk::ToolUseDelta {
id: tool_id.clone(),
delta: args.clone(),
});
if let Some(entry) =
accumulated_tool_calls.get_mut(&tool_id)
{
entry.1.push_str(args);
}
}
}
}
}
}
}
Err(e) => {
tracing::warn!(
target: "local_driver",
"Failed to parse SSE: {}, data: {}",
e, data
);
}
}
}
}
}
})
}
}
// ---------------------------------------------------------------------------
// Connection-error diagnostics
// ---------------------------------------------------------------------------
/// Return a human-readable hint when the local server appears to be unreachable.
fn connection_error_hint(error: &reqwest::Error) -> String {
if error.is_connect() {
format!(
"\n\nHint: Is the local LLM server running at {}?\n\
Make sure the server is started before using this driver.",
// Extract just the host:port from whatever error we have.
"localhost"
)
} else if error.is_timeout() {
"\n\nHint: The request timed out. Local inference can be slow -- \
try a smaller model or increase the timeout."
.to_string()
} else {
String::new()
}
}
// ---------------------------------------------------------------------------
// OpenAI-compatible API types (private to this module)
// ---------------------------------------------------------------------------
#[derive(Serialize)]
struct LocalApiRequest {
model: String,
messages: Vec<LocalApiMessage>,
#[serde(skip_serializing_if = "Option::is_none")]
max_tokens: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
temperature: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
stop: Option<Vec<String>>,
#[serde(default)]
stream: bool,
#[serde(skip_serializing_if = "Option::is_none")]
tools: Option<Vec<LocalApiTool>>,
}
#[derive(Serialize)]
struct LocalApiMessage {
role: String,
#[serde(skip_serializing_if = "Option::is_none")]
content: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
tool_calls: Option<Vec<LocalApiToolCall>>,
}
#[derive(Serialize)]
struct LocalApiToolCall {
id: String,
r#type: String,
function: LocalFunctionCall,
}
#[derive(Serialize)]
struct LocalFunctionCall {
name: String,
arguments: String,
}
#[derive(Serialize)]
struct LocalApiTool {
r#type: String,
function: LocalFunctionDef,
}
#[derive(Serialize)]
struct LocalFunctionDef {
name: String,
description: String,
parameters: serde_json::Value,
}
// --- Response types ---
#[derive(Deserialize, Default)]
struct LocalApiResponse {
#[serde(default)]
choices: Vec<LocalApiChoice>,
#[serde(default)]
usage: Option<LocalApiUsage>,
}
#[derive(Deserialize, Default)]
struct LocalApiChoice {
#[serde(default)]
message: LocalApiResponseMessage,
#[serde(default)]
finish_reason: Option<String>,
}
#[derive(Deserialize, Default)]
struct LocalApiResponseMessage {
#[serde(default)]
content: Option<String>,
#[serde(default)]
tool_calls: Option<Vec<LocalApiToolCallResponse>>,
}
#[derive(Deserialize, Default)]
struct LocalApiToolCallResponse {
#[serde(default)]
id: String,
#[serde(default)]
function: LocalFunctionCallResponse,
}
#[derive(Deserialize, Default)]
struct LocalFunctionCallResponse {
#[serde(default)]
name: String,
#[serde(default)]
arguments: String,
}
#[derive(Deserialize, Default)]
struct LocalApiUsage {
#[serde(default)]
prompt_tokens: u32,
#[serde(default)]
completion_tokens: u32,
}
// --- Streaming types ---
#[derive(Debug, Deserialize)]
struct LocalStreamResponse {
#[serde(default)]
choices: Vec<LocalStreamChoice>,
}
#[derive(Debug, Deserialize)]
struct LocalStreamChoice {
#[serde(default)]
delta: LocalDelta,
#[serde(default)]
#[allow(dead_code)] // Deserialized from SSE, not accessed in code
finish_reason: Option<String>,
}
#[derive(Debug, Deserialize, Default)]
struct LocalDelta {
#[serde(default)]
content: Option<String>,
#[serde(default)]
tool_calls: Option<Vec<LocalToolCallDelta>>,
}
#[derive(Debug, Deserialize)]
struct LocalToolCallDelta {
#[serde(default)]
id: Option<String>,
#[serde(default)]
function: Option<LocalFunctionDelta>,
}
#[derive(Debug, Deserialize)]
struct LocalFunctionDelta {
#[serde(default)]
name: Option<String>,
#[serde(default)]
arguments: Option<String>,
}

View File

@@ -12,6 +12,7 @@ pub mod loop_runner;
pub mod loop_guard;
pub mod stream;
pub mod growth;
pub mod compaction;
// Re-export main types
pub use driver::{

View File

@@ -11,6 +11,7 @@ use crate::tool::{ToolRegistry, ToolContext, SkillExecutor};
use crate::tool::builtin::PathValidator;
use crate::loop_guard::LoopGuard;
use crate::growth::GrowthIntegration;
use crate::compaction;
use zclaw_memory::MemoryStore;
/// Agent loop runner
@@ -29,6 +30,8 @@ pub struct AgentLoop {
path_validator: Option<PathValidator>,
/// Growth system integration (optional)
growth: Option<GrowthIntegration>,
/// Compaction threshold in tokens (0 = disabled)
compaction_threshold: usize,
}
impl AgentLoop {
@@ -51,6 +54,7 @@ impl AgentLoop {
skill_executor: None,
path_validator: None,
growth: None,
compaction_threshold: 0,
}
}
@@ -101,6 +105,16 @@ impl AgentLoop {
self.growth = Some(growth);
}
/// Set compaction threshold in tokens (0 = disabled)
///
/// When the estimated token count of conversation history exceeds this
/// threshold, older messages are summarized into a single system message
/// and only recent messages are sent to the LLM.
pub fn with_compaction_threshold(mut self, threshold: usize) -> Self {
self.compaction_threshold = threshold;
self
}
/// Get growth integration reference
pub fn growth(&self) -> Option<&GrowthIntegration> {
self.growth.as_ref()
@@ -134,6 +148,11 @@ impl AgentLoop {
// Get all messages for context
let mut messages = self.memory.get_messages(&session_id).await?;
// Apply compaction if threshold is configured
if self.compaction_threshold > 0 {
messages = compaction::maybe_compact(messages, self.compaction_threshold);
}
// Enhance system prompt with growth memories
let enhanced_prompt = if let Some(ref growth) = self.growth {
let base = self.system_prompt.as_deref().unwrap_or("");
@@ -260,7 +279,12 @@ impl AgentLoop {
self.memory.append_message(&session_id, &user_message).await?;
// Get all messages for context
let messages = self.memory.get_messages(&session_id).await?;
let mut messages = self.memory.get_messages(&session_id).await?;
// Apply compaction if threshold is configured
if self.compaction_threshold > 0 {
messages = compaction::maybe_compact(messages, self.compaction_threshold);
}
// Enhance system prompt with growth memories
let enhanced_prompt = if let Some(ref growth) = self.growth {