diff --git a/crates/zclaw-kernel/src/kernel/messaging.rs b/crates/zclaw-kernel/src/kernel/messaging.rs index 5361d00..b3117fe 100644 --- a/crates/zclaw-kernel/src/kernel/messaging.rs +++ b/crates/zclaw-kernel/src/kernel/messaging.rs @@ -13,6 +13,109 @@ pub struct ChatModeConfig { pub subagent_enabled: Option, } +/// Result of a successful schedule intent interception. +pub struct ScheduleInterceptResult { + /// Pre-built streaming receiver with confirmation message. + pub rx: mpsc::Receiver, + /// Human-readable task description. + pub task_description: String, + /// Natural language description of the schedule. + pub natural_description: String, + /// Cron expression. + pub cron_expression: String, +} + +impl Kernel { + /// Try to intercept a schedule intent from the user's message. + /// + /// If the message contains a clear schedule intent (e.g., "每天早上9点提醒我查房"), + /// parse it, create a trigger, and return a streaming receiver with the + /// confirmation message. Returns `Ok(None)` if no interception occurred. + pub async fn try_intercept_schedule( + &self, + message: &str, + agent_id: &AgentId, + ) -> Result> { + if !zclaw_runtime::nl_schedule::has_schedule_intent(message) { + return Ok(None); + } + + let parse_result = zclaw_runtime::nl_schedule::parse_nl_schedule(message, agent_id); + + match parse_result { + zclaw_runtime::nl_schedule::ScheduleParseResult::Exact(ref parsed) + if parsed.confidence >= 0.8 => + { + let trigger_id = format!( + "sched_{}_{}", + chrono::Utc::now().timestamp_millis(), + &uuid::Uuid::new_v4().to_string()[..8] + ); + let trigger_config = zclaw_hands::TriggerConfig { + id: trigger_id.clone(), + name: parsed.task_description.clone(), + hand_id: "_reminder".to_string(), + trigger_type: zclaw_hands::TriggerType::Schedule { + cron: parsed.cron_expression.clone(), + }, + enabled: true, + max_executions_per_hour: 60, + }; + + match self.create_trigger(trigger_config).await { + Ok(_entry) => { + tracing::info!( + "[Kernel] Schedule trigger created: {} (cron: {})", + trigger_id, parsed.cron_expression + ); + let confirm_msg = format!( + "已为您设置定时任务:\n\n- **任务**:{}\n- **时间**:{}\n- **Cron**:`{}`\n\n任务已激活,将在设定时间自动执行。", + parsed.task_description, + parsed.natural_description, + parsed.cron_expression, + ); + + let (tx, rx) = mpsc::channel(32); + if tx.send(zclaw_runtime::LoopEvent::Delta(confirm_msg)).await.is_err() { + tracing::warn!("[Kernel] Failed to send confirm msg to channel"); + } + if tx.send(zclaw_runtime::LoopEvent::Complete( + zclaw_runtime::AgentLoopResult { + response: String::new(), + input_tokens: 0, + output_tokens: 0, + iterations: 1, + } + )).await.is_err() { + tracing::warn!("[Kernel] Failed to send complete to channel"); + } + drop(tx); + + Ok(Some(ScheduleInterceptResult { + rx, + task_description: parsed.task_description.clone(), + natural_description: parsed.natural_description.clone(), + cron_expression: parsed.cron_expression.clone(), + })) + } + Err(e) => { + tracing::warn!( + "[Kernel] Failed to create schedule trigger, falling through to LLM: {}", e + ); + Ok(None) + } + } + } + _ => { + tracing::debug!( + "[Kernel] Schedule intent detected but not confident enough, falling through to LLM" + ); + Ok(None) + } + } + } +} + use zclaw_runtime::{AgentLoop, tool::builtin::PathValidator}; use super::Kernel; diff --git a/crates/zclaw-kernel/src/kernel/mod.rs b/crates/zclaw-kernel/src/kernel/mod.rs index 7e6593a..a991f26 100644 --- a/crates/zclaw-kernel/src/kernel/mod.rs +++ b/crates/zclaw-kernel/src/kernel/mod.rs @@ -30,6 +30,7 @@ use zclaw_hands::{HandRegistry, hands::{BrowserHand, QuizHand, ResearcherHand, C pub use adapters::KernelSkillExecutor; pub use adapters::KernelHandExecutor; pub use messaging::ChatModeConfig; +pub use messaging::ScheduleInterceptResult; /// The ZCLAW Kernel pub struct Kernel { diff --git a/desktop/src-tauri/src/kernel_commands/chat.rs b/desktop/src-tauri/src/kernel_commands/chat.rs index 523a4fb..4e5df20 100644 --- a/desktop/src-tauri/src/kernel_commands/chat.rs +++ b/desktop/src-tauri/src/kernel_commands/chat.rs @@ -7,6 +7,7 @@ use zclaw_types::AgentId; use super::{validate_agent_id, KernelState, SessionStreamGuard, StreamCancelFlags}; use crate::intelligence::validation::validate_string_length; +use zclaw_runtime::LoopEvent; // --------------------------------------------------------------------------- // Request / Response types @@ -60,6 +61,47 @@ pub enum StreamChatEvent { Error { message: String }, } +/// Translate a runtime LoopEvent into a Tauri StreamChatEvent. +/// +/// Hand tools (name starts with "hand_") are mapped to HandStart/HandEnd +/// variants; all other tool events use ToolStart/ToolEnd. +fn translate_event(event: &zclaw_runtime::LoopEvent) -> StreamChatEvent { + match event { + LoopEvent::Delta(delta) => StreamChatEvent::Delta { delta: delta.clone() }, + LoopEvent::ThinkingDelta(delta) => StreamChatEvent::ThinkingDelta { delta: delta.clone() }, + LoopEvent::ToolStart { name, input } => { + if name.starts_with("hand_") { + StreamChatEvent::HandStart { name: name.clone(), params: input.clone() } + } else { + StreamChatEvent::ToolStart { name: name.clone(), input: input.clone() } + } + } + LoopEvent::ToolEnd { name, output } => { + if name.starts_with("hand_") { + StreamChatEvent::HandEnd { name: name.clone(), result: output.clone() } + } else { + StreamChatEvent::ToolEnd { name: name.clone(), output: output.clone() } + } + } + LoopEvent::SubtaskStatus { task_id, description, status, detail } => { + StreamChatEvent::SubtaskStatus { + task_id: task_id.clone(), + description: description.clone(), + status: status.clone(), + detail: detail.clone(), + } + } + LoopEvent::IterationStart { iteration, max_iterations } => { + StreamChatEvent::IterationStart { iteration: *iteration, max_iterations: *max_iterations } + } + LoopEvent::Complete(result) => StreamChatEvent::Complete { + input_tokens: result.input_tokens, + output_tokens: result.output_tokens, + }, + LoopEvent::Error(message) => StreamChatEvent::Error { message: message.clone() }, + } +} + /// Streaming chat request #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -218,152 +260,66 @@ pub async fn agent_chat_stream( ).await.unwrap_or_default(); // --- Schedule intent interception --- - // If the user's message contains a schedule intent (e.g. "每天早上9点提醒我查房"), - // parse it with NlScheduleParser, create a trigger, and return confirmation - // directly without calling the LLM. - let mut captured_parsed: Option = None; - - if zclaw_runtime::nl_schedule::has_schedule_intent(&message) { - let parse_result = zclaw_runtime::nl_schedule::parse_nl_schedule(&message, &id); - - match parse_result { - zclaw_runtime::nl_schedule::ScheduleParseResult::Exact(ref parsed) - if parsed.confidence >= 0.8 => - { - // Try to create a schedule trigger - let kernel_lock = state.lock().await; - if let Some(kernel) = kernel_lock.as_ref() { - // Use UUID fragment to avoid collision under high concurrency - let trigger_id = format!( - "sched_{}_{}", - chrono::Utc::now().timestamp_millis(), - &uuid::Uuid::new_v4().to_string()[..8] - ); - let trigger_config = zclaw_hands::TriggerConfig { - id: trigger_id.clone(), - name: parsed.task_description.clone(), - hand_id: "_reminder".to_string(), - trigger_type: zclaw_hands::TriggerType::Schedule { - cron: parsed.cron_expression.clone(), - }, - enabled: true, - // 60/hour = once per minute max, reasonable for scheduled tasks - max_executions_per_hour: 60, - }; - - match kernel.create_trigger(trigger_config).await { - Ok(_entry) => { - tracing::info!( - "[agent_chat_stream] Schedule trigger created: {} (cron: {})", - trigger_id, parsed.cron_expression - ); - captured_parsed = Some(parsed.clone()); - } - Err(e) => { - tracing::warn!( - "[agent_chat_stream] Failed to create schedule trigger, falling through to LLM: {}", - e - ); - } - } - } - } - _ => { - // Ambiguous, Unclear, or low confidence — let LLM handle it naturally - tracing::debug!( - "[agent_chat_stream] Schedule intent detected but not confident enough, falling through to LLM" - ); - } - } - } - - // Get the streaming receiver while holding the lock, then release it - // NOTE: When schedule_intercepted, llm_driver is None so post_conversation_hook - // (memory extraction, heartbeat, reflection) is intentionally skipped — - // schedule confirmations are system messages, not user conversations. - let (mut rx, llm_driver) = if let Some(parsed) = captured_parsed { - // Schedule was intercepted — build confirmation message directly - let confirm_msg = format!( - "已为您设置定时任务:\n\n- **任务**:{}\n- **时间**:{}\n- **Cron**:`{}`\n\n任务已激活,将在设定时间自动执行。", - parsed.task_description, - parsed.natural_description, - parsed.cron_expression, - ); - - let (tx, rx) = tokio::sync::mpsc::channel(32); - if tx.send(zclaw_runtime::LoopEvent::Delta(confirm_msg)).await.is_err() { - tracing::warn!("[agent_chat_stream] Failed to send confirm msg to new channel"); - } - if tx.send(zclaw_runtime::LoopEvent::Complete( - zclaw_runtime::AgentLoopResult { - response: String::new(), - input_tokens: 0, - output_tokens: 0, - iterations: 1, - } - )).await.is_err() { - tracing::warn!("[agent_chat_stream] Failed to send complete to new channel"); - } - drop(tx); - (rx, None) - } else { - // Normal LLM chat path + // Try to intercept schedule intents (e.g. "每天早上9点提醒我查房") at the kernel level. + // If intercepted, returns a pre-built confirmation stream — no LLM call needed. + let (mut rx, llm_driver) = { let kernel_lock = state.lock().await; let kernel = kernel_lock.as_ref() - .ok_or_else(|| { - // Cleanup on error: release guard + cancel flag - err_cleanup_flag.store(false, std::sync::atomic::Ordering::SeqCst); - err_cleanup_guard.remove(&err_cleanup_session_id); - err_cleanup_cancel.remove(&err_cleanup_session_id); - "Kernel not initialized. Call kernel_init first.".to_string() - })?; + .ok_or_else(|| "Kernel not initialized. Call kernel_init first.".to_string())?; - let driver = Some(kernel.driver()); - - let prompt_arg = if enhanced_prompt.is_empty() { None } else { Some(enhanced_prompt) }; - - let session_id_parsed = if session_id.is_empty() { - None - } else { - match uuid::Uuid::parse_str(&session_id) { - Ok(uuid) => Some(zclaw_types::SessionId::from_uuid(uuid)), - Err(e) => { - // Cleanup on error - err_cleanup_flag.store(false, std::sync::atomic::Ordering::SeqCst); - err_cleanup_guard.remove(&err_cleanup_session_id); - err_cleanup_cancel.remove(&err_cleanup_session_id); - return Err(format!( - "Invalid session_id '{}': {}. Cannot reuse conversation context.", - session_id, e - )); - } + match kernel.try_intercept_schedule(&message, &id).await { + Ok(Some(intercept)) => { + tracing::info!("[agent_chat_stream] Schedule intercepted: {}", intercept.task_description); + (intercept.rx, None) } - }; - // Build chat mode config from request parameters - let chat_mode_config = zclaw_kernel::ChatModeConfig { - thinking_enabled: request.thinking_enabled, - reasoning_effort: request.reasoning_effort.clone(), - plan_mode: request.plan_mode, - subagent_enabled: request.subagent_enabled, - }; + _ => { + // No interception or error — normal LLM chat path + let driver = Some(kernel.driver()); - let rx = kernel.send_message_stream_with_prompt( - &id, - message.clone(), - prompt_arg, - session_id_parsed, - Some(chat_mode_config), - request.model.clone(), - ) - .await - .map_err(|e| { - // Cleanup on error - err_cleanup_flag.store(false, std::sync::atomic::Ordering::SeqCst); - err_cleanup_guard.remove(&err_cleanup_session_id); - err_cleanup_cancel.remove(&err_cleanup_session_id); - format!("Failed to start streaming: {}", e) - })?; - (rx, driver) + let prompt_arg = if enhanced_prompt.is_empty() { None } else { Some(enhanced_prompt) }; + + let session_id_parsed = if session_id.is_empty() { + None + } else { + match uuid::Uuid::parse_str(&session_id) { + Ok(uuid) => Some(zclaw_types::SessionId::from_uuid(uuid)), + Err(e) => { + err_cleanup_flag.store(false, std::sync::atomic::Ordering::SeqCst); + err_cleanup_guard.remove(&err_cleanup_session_id); + err_cleanup_cancel.remove(&err_cleanup_session_id); + return Err(format!( + "Invalid session_id '{}': {}. Cannot reuse conversation context.", + session_id, e + )); + } + } + }; + + let chat_mode_config = zclaw_kernel::ChatModeConfig { + thinking_enabled: request.thinking_enabled, + reasoning_effort: request.reasoning_effort.clone(), + plan_mode: request.plan_mode, + subagent_enabled: request.subagent_enabled, + }; + + let rx = kernel.send_message_stream_with_prompt( + &id, + message.clone(), + prompt_arg, + session_id_parsed, + Some(chat_mode_config), + request.model.clone(), + ) + .await + .map_err(|e| { + err_cleanup_flag.store(false, std::sync::atomic::Ordering::SeqCst); + err_cleanup_guard.remove(&err_cleanup_session_id); + err_cleanup_cancel.remove(&err_cleanup_session_id); + format!("Failed to start streaming: {}", e) + })?; + (rx, driver) + } + } }; let hb_state = heartbeat_state.inner().clone(); @@ -415,69 +371,23 @@ pub async fn agent_chat_stream( match tokio::time::timeout(stream_timeout, rx.recv()).await { Ok(Some(event)) => { - let stream_event = match &event { - LoopEvent::Delta(delta) => { - tracing::trace!("[agent_chat_stream] Delta: {} bytes", delta.len()); - StreamChatEvent::Delta { delta: delta.clone() } - } - LoopEvent::ThinkingDelta(delta) => { - tracing::trace!("[agent_chat_stream] ThinkingDelta: {} bytes", delta.len()); - StreamChatEvent::ThinkingDelta { delta: delta.clone() } - } - LoopEvent::ToolStart { name, input } => { - tracing::debug!("[agent_chat_stream] ToolStart: {}", name); - if name.starts_with("hand_") { - StreamChatEvent::HandStart { name: name.clone(), params: input.clone() } - } else { - StreamChatEvent::ToolStart { name: name.clone(), input: input.clone() } - } - } - LoopEvent::ToolEnd { name, output } => { - tracing::debug!("[agent_chat_stream] ToolEnd: {}", name); - if name.starts_with("hand_") { - StreamChatEvent::HandEnd { name: name.clone(), result: output.clone() } - } else { - StreamChatEvent::ToolEnd { name: name.clone(), output: output.clone() } - } - } - LoopEvent::SubtaskStatus { task_id, description, status, detail } => { - tracing::debug!("[agent_chat_stream] SubtaskStatus: {} - {} (id={})", description, status, task_id); - StreamChatEvent::SubtaskStatus { - task_id: task_id.clone(), - description: description.clone(), - status: status.clone(), - detail: detail.clone(), - } - } - LoopEvent::IterationStart { iteration, max_iterations } => { - tracing::debug!("[agent_chat_stream] IterationStart: {}/{}", iteration, max_iterations); - StreamChatEvent::IterationStart { iteration: *iteration, max_iterations: *max_iterations } - } - LoopEvent::Complete(result) => { - tracing::info!("[agent_chat_stream] Complete: input_tokens={}, output_tokens={}", - result.input_tokens, result.output_tokens); + // Fire post-conversation hooks before translating (memory extraction, heartbeat, reflection) + if let LoopEvent::Complete(result) = &event { + tracing::info!("[agent_chat_stream] Complete: input_tokens={}, output_tokens={}", + result.input_tokens, result.output_tokens); + let agent_id_hook = agent_id_str.clone(); + let message_hook = message.clone(); + let hb = hb_state.clone(); + let rf = rf_state.clone(); + let driver = llm_driver.clone(); + tokio::spawn(async move { + crate::intelligence_hooks::post_conversation_hook( + &agent_id_hook, &message_hook, &hb, &rf, driver, + ).await; + }); + } - let agent_id_hook = agent_id_str.clone(); - let message_hook = message.clone(); - let hb = hb_state.clone(); - let rf = rf_state.clone(); - let driver = llm_driver.clone(); - tokio::spawn(async move { - crate::intelligence_hooks::post_conversation_hook( - &agent_id_hook, &message_hook, &hb, &rf, driver, - ).await; - }); - - StreamChatEvent::Complete { - input_tokens: result.input_tokens, - output_tokens: result.output_tokens, - } - } - LoopEvent::Error(message) => { - tracing::warn!("[agent_chat_stream] Error: {}", message); - StreamChatEvent::Error { message: message.clone() } - } - }; + let stream_event = translate_event(&event); if let Err(e) = app.emit("stream:chunk", serde_json::json!({ "sessionId": session_id,