refactor(desktop): split kernel_commands/pipeline_commands into modules, add SaaS client libs and gateway modules
Split monolithic kernel_commands.rs (2185 lines) and pipeline_commands.rs (1391 lines) into focused sub-modules under kernel_commands/ and pipeline_commands/ directories. Add gateway module (commands, config, io, runtime), health_check, and 15 new TypeScript client libraries for SaaS relay, auth, admin, telemetry, and kernel sub-systems (a2a, agent, chat, hands, skills, triggers). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
274
desktop/src-tauri/src/kernel_commands/chat.rs
Normal file
274
desktop/src-tauri/src/kernel_commands/chat.rs
Normal file
@@ -0,0 +1,274 @@
|
||||
//! Chat commands: send message, streaming chat
|
||||
|
||||
use std::sync::Arc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tauri::{AppHandle, Emitter, State};
|
||||
use tokio::sync::Mutex;
|
||||
use zclaw_types::AgentId;
|
||||
|
||||
use super::{validate_agent_id, KernelState, SessionStreamGuard};
|
||||
use crate::intelligence::validation::validate_string_length;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Request / Response types
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Chat request
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ChatRequest {
|
||||
pub agent_id: String,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
/// Chat response
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ChatResponse {
|
||||
pub content: String,
|
||||
pub input_tokens: u32,
|
||||
pub output_tokens: u32,
|
||||
}
|
||||
|
||||
/// Streaming chat event for Tauri emission
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase", tag = "type")]
|
||||
pub enum StreamChatEvent {
|
||||
Delta { delta: String },
|
||||
ToolStart { name: String, input: serde_json::Value },
|
||||
ToolEnd { name: String, output: serde_json::Value },
|
||||
IterationStart { iteration: usize, max_iterations: usize },
|
||||
HandStart { name: String, params: serde_json::Value },
|
||||
HandEnd { name: String, result: serde_json::Value },
|
||||
Complete { input_tokens: u32, output_tokens: u32 },
|
||||
Error { message: String },
|
||||
}
|
||||
|
||||
/// Streaming chat request
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct StreamChatRequest {
|
||||
pub agent_id: String,
|
||||
pub session_id: String,
|
||||
pub message: String,
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Commands
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Send a message to an agent
|
||||
#[tauri::command]
|
||||
pub async fn agent_chat(
|
||||
state: State<'_, KernelState>,
|
||||
request: ChatRequest,
|
||||
) -> Result<ChatResponse, String> {
|
||||
validate_agent_id(&request.agent_id)?;
|
||||
validate_string_length(&request.message, "message", 100000)
|
||||
.map_err(|e| format!("Invalid message: {}", e))?;
|
||||
|
||||
let kernel_lock = state.lock().await;
|
||||
let kernel = kernel_lock.as_ref()
|
||||
.ok_or_else(|| "Kernel not initialized. Call kernel_init first.".to_string())?;
|
||||
|
||||
let id: AgentId = request.agent_id.parse()
|
||||
.map_err(|_| "Invalid agent ID format".to_string())?;
|
||||
|
||||
let response = kernel.send_message(&id, request.message)
|
||||
.await
|
||||
.map_err(|e| format!("Chat failed: {}", e))?;
|
||||
|
||||
Ok(ChatResponse {
|
||||
content: response.content,
|
||||
input_tokens: response.input_tokens,
|
||||
output_tokens: response.output_tokens,
|
||||
})
|
||||
}
|
||||
|
||||
/// Send a message to an agent with streaming response
|
||||
///
|
||||
/// This command initiates a streaming chat session. Events are emitted
|
||||
/// via Tauri's event system with the name "stream:chunk" and include
|
||||
/// the session_id for routing.
|
||||
#[tauri::command]
|
||||
pub async fn agent_chat_stream(
|
||||
app: AppHandle,
|
||||
state: State<'_, KernelState>,
|
||||
identity_state: State<'_, crate::intelligence::IdentityManagerState>,
|
||||
heartbeat_state: State<'_, crate::intelligence::HeartbeatEngineState>,
|
||||
reflection_state: State<'_, crate::intelligence::ReflectionEngineState>,
|
||||
stream_guard: State<'_, SessionStreamGuard>,
|
||||
request: StreamChatRequest,
|
||||
) -> Result<(), String> {
|
||||
validate_agent_id(&request.agent_id)?;
|
||||
validate_string_length(&request.message, "message", 100000)
|
||||
.map_err(|e| format!("Invalid message: {}", e))?;
|
||||
|
||||
let id: AgentId = request.agent_id.parse()
|
||||
.map_err(|_| "Invalid agent ID format".to_string())?;
|
||||
|
||||
let session_id = request.session_id.clone();
|
||||
let agent_id_str = request.agent_id.clone();
|
||||
let message = request.message.clone();
|
||||
|
||||
// Session-level concurrency guard
|
||||
let session_mutex = stream_guard
|
||||
.entry(session_id.clone())
|
||||
.or_insert_with(|| Arc::new(Mutex::new(())));
|
||||
let _session_guard = session_mutex.try_lock()
|
||||
.map_err(|_| {
|
||||
tracing::warn!(
|
||||
"[agent_chat_stream] Session {} already has an active stream — rejecting",
|
||||
session_id
|
||||
);
|
||||
format!("Session {} already has an active stream", session_id)
|
||||
})?;
|
||||
|
||||
// AUTO-INIT HEARTBEAT
|
||||
{
|
||||
let mut engines = heartbeat_state.lock().await;
|
||||
if !engines.contains_key(&request.agent_id) {
|
||||
let engine = crate::intelligence::heartbeat::HeartbeatEngine::new(
|
||||
request.agent_id.clone(),
|
||||
None,
|
||||
);
|
||||
engines.insert(request.agent_id.clone(), engine);
|
||||
tracing::info!("[agent_chat_stream] Auto-initialized heartbeat for agent: {}", request.agent_id);
|
||||
}
|
||||
}
|
||||
|
||||
// PRE-CONVERSATION: Build intelligence-enhanced system prompt
|
||||
let enhanced_prompt = crate::intelligence_hooks::pre_conversation_hook(
|
||||
&request.agent_id,
|
||||
&request.message,
|
||||
&identity_state,
|
||||
).await.unwrap_or_default();
|
||||
|
||||
// Get the streaming receiver while holding the lock, then release it
|
||||
let (mut rx, llm_driver) = {
|
||||
let kernel_lock = state.lock().await;
|
||||
let kernel = kernel_lock.as_ref()
|
||||
.ok_or_else(|| "Kernel not initialized. Call kernel_init first.".to_string())?;
|
||||
|
||||
let driver = Some(kernel.driver());
|
||||
|
||||
let prompt_arg = if enhanced_prompt.is_empty() { None } else { Some(enhanced_prompt) };
|
||||
|
||||
let session_id_parsed = if session_id.is_empty() {
|
||||
None
|
||||
} else {
|
||||
match uuid::Uuid::parse_str(&session_id) {
|
||||
Ok(uuid) => Some(zclaw_types::SessionId::from_uuid(uuid)),
|
||||
Err(e) => {
|
||||
return Err(format!(
|
||||
"Invalid session_id '{}': {}. Cannot reuse conversation context.",
|
||||
session_id, e
|
||||
));
|
||||
}
|
||||
}
|
||||
};
|
||||
let rx = kernel.send_message_stream_with_prompt(&id, message.clone(), prompt_arg, session_id_parsed)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to start streaming: {}", e))?;
|
||||
(rx, driver)
|
||||
};
|
||||
|
||||
let hb_state = heartbeat_state.inner().clone();
|
||||
let rf_state = reflection_state.inner().clone();
|
||||
|
||||
// Spawn a task to process stream events with timeout guard
|
||||
tokio::spawn(async move {
|
||||
use zclaw_runtime::LoopEvent;
|
||||
|
||||
tracing::debug!("[agent_chat_stream] Starting stream processing for session: {}", session_id);
|
||||
|
||||
let stream_timeout = tokio::time::Duration::from_secs(300);
|
||||
|
||||
loop {
|
||||
match tokio::time::timeout(stream_timeout, rx.recv()).await {
|
||||
Ok(Some(event)) => {
|
||||
let stream_event = match &event {
|
||||
LoopEvent::Delta(delta) => {
|
||||
tracing::trace!("[agent_chat_stream] Delta: {} bytes", delta.len());
|
||||
StreamChatEvent::Delta { delta: delta.clone() }
|
||||
}
|
||||
LoopEvent::ToolStart { name, input } => {
|
||||
tracing::debug!("[agent_chat_stream] ToolStart: {}", name);
|
||||
if name.starts_with("hand_") {
|
||||
StreamChatEvent::HandStart { name: name.clone(), params: input.clone() }
|
||||
} else {
|
||||
StreamChatEvent::ToolStart { name: name.clone(), input: input.clone() }
|
||||
}
|
||||
}
|
||||
LoopEvent::ToolEnd { name, output } => {
|
||||
tracing::debug!("[agent_chat_stream] ToolEnd: {}", name);
|
||||
if name.starts_with("hand_") {
|
||||
StreamChatEvent::HandEnd { name: name.clone(), result: output.clone() }
|
||||
} else {
|
||||
StreamChatEvent::ToolEnd { name: name.clone(), output: output.clone() }
|
||||
}
|
||||
}
|
||||
LoopEvent::IterationStart { iteration, max_iterations } => {
|
||||
tracing::debug!("[agent_chat_stream] IterationStart: {}/{}", iteration, max_iterations);
|
||||
StreamChatEvent::IterationStart { iteration: *iteration, max_iterations: *max_iterations }
|
||||
}
|
||||
LoopEvent::Complete(result) => {
|
||||
tracing::info!("[agent_chat_stream] Complete: input_tokens={}, output_tokens={}",
|
||||
result.input_tokens, result.output_tokens);
|
||||
|
||||
let agent_id_hook = agent_id_str.clone();
|
||||
let message_hook = message.clone();
|
||||
let hb = hb_state.clone();
|
||||
let rf = rf_state.clone();
|
||||
let driver = llm_driver.clone();
|
||||
tokio::spawn(async move {
|
||||
crate::intelligence_hooks::post_conversation_hook(
|
||||
&agent_id_hook, &message_hook, &hb, &rf, driver,
|
||||
).await;
|
||||
});
|
||||
|
||||
StreamChatEvent::Complete {
|
||||
input_tokens: result.input_tokens,
|
||||
output_tokens: result.output_tokens,
|
||||
}
|
||||
}
|
||||
LoopEvent::Error(message) => {
|
||||
tracing::warn!("[agent_chat_stream] Error: {}", message);
|
||||
StreamChatEvent::Error { message: message.clone() }
|
||||
}
|
||||
};
|
||||
|
||||
if let Err(e) = app.emit("stream:chunk", serde_json::json!({
|
||||
"sessionId": session_id,
|
||||
"event": stream_event
|
||||
})) {
|
||||
tracing::warn!("[agent_chat_stream] Failed to emit event: {}", e);
|
||||
break;
|
||||
}
|
||||
|
||||
if matches!(event, LoopEvent::Complete(_) | LoopEvent::Error(_)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
tracing::info!("[agent_chat_stream] Stream channel closed for session: {}", session_id);
|
||||
break;
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::warn!("[agent_chat_stream] Stream idle timeout for session: {}", session_id);
|
||||
let _ = app.emit("stream:chunk", serde_json::json!({
|
||||
"sessionId": session_id,
|
||||
"event": StreamChatEvent::Error {
|
||||
message: "流式响应超时,请重试".to_string()
|
||||
}
|
||||
}));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!("[agent_chat_stream] Stream processing ended for session: {}", session_id);
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user