Files
zclaw_openfang/desktop/src-tauri/src/kernel_commands/chat.rs
iven 924ad5a6ec
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
fix(audit): Batch 0-1 文档校准 + let _ = 静默错误修复
Batch 0:
- TRUTH.md 中间件层 14→15 (补 EvolutionMiddleware@78)
- wiki/middleware.md 同步 15 层 + 优先级分类更新
- Store 数字确认 25 个

Batch 1:
- approvals.rs: 3 处 map_err+let _ = 简化为 if let Err
- director.rs: oneshot send 失败添加 debug 日志
- task.rs: 4 处子任务状态更新添加 debug 日志
- chat.rs: 流消息发送和事件 emit 添加 warn/debug 日志
- heartbeat.rs: 告警广播添加 debug 日志 + break 优化

全量测试通过: 719 passed, 0 failed
2026-04-19 08:30:33 +08:00

549 lines
24 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Chat commands: send message, streaming chat
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tauri::{AppHandle, Emitter, State};
use zclaw_types::AgentId;
use super::{validate_agent_id, KernelState, SessionStreamGuard, StreamCancelFlags};
use crate::intelligence::validation::validate_string_length;
// ---------------------------------------------------------------------------
// Request / Response types
// ---------------------------------------------------------------------------
/// Chat request
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ChatRequest {
pub agent_id: String,
pub message: String,
/// Enable extended thinking/reasoning
#[serde(default)]
pub thinking_enabled: Option<bool>,
/// Reasoning effort level (low/medium/high)
#[serde(default)]
pub reasoning_effort: Option<String>,
/// Enable plan mode
#[serde(default)]
pub plan_mode: Option<bool>,
/// Enable sub-agent delegation (Ultra mode only)
#[serde(default)]
pub subagent_enabled: Option<bool>,
/// Model override — UI 选择的模型优先于 Agent 配置的默认模型
#[serde(default)]
pub model: Option<String>,
}
/// Chat response
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ChatResponse {
pub content: String,
pub input_tokens: u32,
pub output_tokens: u32,
}
/// Streaming chat event for Tauri emission
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum StreamChatEvent {
Delta { delta: String },
ThinkingDelta { delta: String },
ToolStart { name: String, input: serde_json::Value },
ToolEnd { name: String, output: serde_json::Value },
SubtaskStatus { task_id: String, description: String, status: String, detail: Option<String> },
IterationStart { iteration: usize, max_iterations: usize },
HandStart { name: String, params: serde_json::Value },
HandEnd { name: String, result: serde_json::Value },
Complete { input_tokens: u32, output_tokens: u32 },
Error { message: String },
}
/// Streaming chat request
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StreamChatRequest {
pub agent_id: String,
pub session_id: String,
pub message: String,
/// Enable extended thinking/reasoning
#[serde(default)]
pub thinking_enabled: Option<bool>,
/// Reasoning effort level (low/medium/high)
#[serde(default)]
pub reasoning_effort: Option<String>,
/// Enable plan mode
#[serde(default)]
pub plan_mode: Option<bool>,
/// Enable sub-agent delegation (Ultra mode only)
#[serde(default)]
pub subagent_enabled: Option<bool>,
/// Model override — UI 选择的模型优先于 Agent 配置的默认模型
#[serde(default)]
pub model: Option<String>,
}
// ---------------------------------------------------------------------------
// Commands
// ---------------------------------------------------------------------------
/// Send a message to an agent
// @reserved: agent chat (desktop uses ChatStore/SaaS relay)
// @connected
#[tauri::command]
pub async fn agent_chat(
state: State<'_, KernelState>,
request: ChatRequest,
) -> Result<ChatResponse, String> {
validate_agent_id(&request.agent_id)?;
validate_string_length(&request.message, "message", 100000)
.map_err(|e| format!("Invalid message: {}", e))?;
let kernel_lock = state.lock().await;
let kernel = kernel_lock.as_ref()
.ok_or_else(|| "Kernel not initialized. Call kernel_init first.".to_string())?;
let id: AgentId = request.agent_id.parse()
.map_err(|_| "Invalid agent ID format".to_string())?;
// Build chat mode config from request fields
let chat_mode = if request.thinking_enabled.is_some()
|| request.reasoning_effort.is_some()
|| request.plan_mode.is_some()
|| request.subagent_enabled.is_some()
{
Some(zclaw_kernel::ChatModeConfig {
thinking_enabled: request.thinking_enabled,
reasoning_effort: request.reasoning_effort.clone(),
plan_mode: request.plan_mode,
subagent_enabled: request.subagent_enabled,
})
} else {
None
};
let response = kernel.send_message_with_chat_mode(&id, request.message, chat_mode, request.model)
.await
.map_err(|e| format!("Chat failed: {}", e))?;
Ok(ChatResponse {
content: response.content,
input_tokens: response.input_tokens,
output_tokens: response.output_tokens,
})
}
/// Send a message to an agent with streaming response
///
/// This command initiates a streaming chat session. Events are emitted
/// via Tauri's event system with the name "stream:chunk" and include
/// the session_id for routing.
// @connected
#[tauri::command]
pub async fn agent_chat_stream(
app: AppHandle,
state: State<'_, KernelState>,
identity_state: State<'_, crate::intelligence::IdentityManagerState>,
heartbeat_state: State<'_, crate::intelligence::HeartbeatEngineState>,
reflection_state: State<'_, crate::intelligence::ReflectionEngineState>,
stream_guard: State<'_, SessionStreamGuard>,
cancel_flags: State<'_, StreamCancelFlags>,
request: StreamChatRequest,
) -> Result<(), String> {
validate_agent_id(&request.agent_id)?;
validate_string_length(&request.message, "message", 100000)
.map_err(|e| format!("Invalid message: {}", e))?;
let id: AgentId = request.agent_id.parse()
.map_err(|_| "Invalid agent ID format".to_string())?;
let session_id = request.session_id.clone();
let agent_id_str = request.agent_id.clone();
let message = request.message.clone();
// Session-level concurrency guard using atomic flag
let session_active = stream_guard
.entry(session_id.clone())
.or_insert_with(|| Arc::new(std::sync::atomic::AtomicBool::new(false)));
// Atomically set flag from false→true, fail if already true
if session_active
.compare_exchange(false, true, std::sync::atomic::Ordering::SeqCst, std::sync::atomic::Ordering::SeqCst)
.is_err()
{
tracing::warn!(
"[agent_chat_stream] Session {} already has an active stream — rejecting",
session_id
);
return Err(format!("Session {} already has an active stream", session_id));
}
// Prepare cleanup resources for error paths (before spawn takes ownership)
let err_cleanup_guard = stream_guard.inner().clone();
let err_cleanup_cancel = cancel_flags.inner().clone();
let err_cleanup_session_id = session_id.clone();
let err_cleanup_flag = Arc::clone(&*session_active);
// Register cancellation flag for this session
let cancel_flag = cancel_flags
.entry(session_id.clone())
.or_insert_with(|| Arc::new(std::sync::atomic::AtomicBool::new(false)));
// Ensure flag is reset (in case of stale entry from a previous stream)
cancel_flag.store(false, std::sync::atomic::Ordering::SeqCst);
let cancel_clone = Arc::clone(&*cancel_flag);
let cancel_flags_map: StreamCancelFlags = cancel_flags.inner().clone();
// AUTO-INIT HEARTBEAT
{
let mut engines = heartbeat_state.lock().await;
if !engines.contains_key(&request.agent_id) {
let engine = crate::intelligence::heartbeat::HeartbeatEngine::new(
request.agent_id.clone(),
None,
);
engines.insert(request.agent_id.clone(), engine);
// Start the engine after insertion via the stored reference
if let Some(e) = engines.get(&request.agent_id) {
e.start().await;
}
tracing::info!("[agent_chat_stream] Auto-initialized and started heartbeat for agent: {}", request.agent_id);
}
}
// PRE-CONVERSATION: Build intelligence-enhanced system prompt
let enhanced_prompt = crate::intelligence_hooks::pre_conversation_hook(
&request.agent_id,
&request.message,
&identity_state,
).await.unwrap_or_default();
// --- Schedule intent interception ---
// If the user's message contains a schedule intent (e.g. "每天早上9点提醒我查房"),
// parse it with NlScheduleParser, create a trigger, and return confirmation
// directly without calling the LLM.
let mut captured_parsed: Option<zclaw_runtime::nl_schedule::ParsedSchedule> = None;
if zclaw_runtime::nl_schedule::has_schedule_intent(&message) {
let parse_result = zclaw_runtime::nl_schedule::parse_nl_schedule(&message, &id);
match parse_result {
zclaw_runtime::nl_schedule::ScheduleParseResult::Exact(ref parsed)
if parsed.confidence >= 0.8 =>
{
// Try to create a schedule trigger
let kernel_lock = state.lock().await;
if let Some(kernel) = kernel_lock.as_ref() {
// Use UUID fragment to avoid collision under high concurrency
let trigger_id = format!(
"sched_{}_{}",
chrono::Utc::now().timestamp_millis(),
&uuid::Uuid::new_v4().to_string()[..8]
);
let trigger_config = zclaw_hands::TriggerConfig {
id: trigger_id.clone(),
name: parsed.task_description.clone(),
hand_id: "_reminder".to_string(),
trigger_type: zclaw_hands::TriggerType::Schedule {
cron: parsed.cron_expression.clone(),
},
enabled: true,
// 60/hour = once per minute max, reasonable for scheduled tasks
max_executions_per_hour: 60,
};
match kernel.create_trigger(trigger_config).await {
Ok(_entry) => {
tracing::info!(
"[agent_chat_stream] Schedule trigger created: {} (cron: {})",
trigger_id, parsed.cron_expression
);
captured_parsed = Some(parsed.clone());
}
Err(e) => {
tracing::warn!(
"[agent_chat_stream] Failed to create schedule trigger, falling through to LLM: {}",
e
);
}
}
}
}
_ => {
// Ambiguous, Unclear, or low confidence — let LLM handle it naturally
tracing::debug!(
"[agent_chat_stream] Schedule intent detected but not confident enough, falling through to LLM"
);
}
}
}
// Get the streaming receiver while holding the lock, then release it
// NOTE: When schedule_intercepted, llm_driver is None so post_conversation_hook
// (memory extraction, heartbeat, reflection) is intentionally skipped —
// schedule confirmations are system messages, not user conversations.
let (mut rx, llm_driver) = if let Some(parsed) = captured_parsed {
// Schedule was intercepted — build confirmation message directly
let confirm_msg = format!(
"已为您设置定时任务:\n\n- **任务**{}\n- **时间**{}\n- **Cron**`{}`\n\n任务已激活,将在设定时间自动执行。",
parsed.task_description,
parsed.natural_description,
parsed.cron_expression,
);
let (tx, rx) = tokio::sync::mpsc::channel(32);
if tx.send(zclaw_runtime::LoopEvent::Delta(confirm_msg)).await.is_err() {
tracing::warn!("[agent_chat_stream] Failed to send confirm msg to new channel");
}
if tx.send(zclaw_runtime::LoopEvent::Complete(
zclaw_runtime::AgentLoopResult {
response: String::new(),
input_tokens: 0,
output_tokens: 0,
iterations: 1,
}
)).await.is_err() {
tracing::warn!("[agent_chat_stream] Failed to send complete to new channel");
}
drop(tx);
(rx, None)
} else {
// Normal LLM chat path
let kernel_lock = state.lock().await;
let kernel = kernel_lock.as_ref()
.ok_or_else(|| {
// Cleanup on error: release guard + cancel flag
err_cleanup_flag.store(false, std::sync::atomic::Ordering::SeqCst);
err_cleanup_guard.remove(&err_cleanup_session_id);
err_cleanup_cancel.remove(&err_cleanup_session_id);
"Kernel not initialized. Call kernel_init first.".to_string()
})?;
let driver = Some(kernel.driver());
let prompt_arg = if enhanced_prompt.is_empty() { None } else { Some(enhanced_prompt) };
let session_id_parsed = if session_id.is_empty() {
None
} else {
match uuid::Uuid::parse_str(&session_id) {
Ok(uuid) => Some(zclaw_types::SessionId::from_uuid(uuid)),
Err(e) => {
// Cleanup on error
err_cleanup_flag.store(false, std::sync::atomic::Ordering::SeqCst);
err_cleanup_guard.remove(&err_cleanup_session_id);
err_cleanup_cancel.remove(&err_cleanup_session_id);
return Err(format!(
"Invalid session_id '{}': {}. Cannot reuse conversation context.",
session_id, e
));
}
}
};
// Build chat mode config from request parameters
let chat_mode_config = zclaw_kernel::ChatModeConfig {
thinking_enabled: request.thinking_enabled,
reasoning_effort: request.reasoning_effort.clone(),
plan_mode: request.plan_mode,
subagent_enabled: request.subagent_enabled,
};
let rx = kernel.send_message_stream_with_prompt(
&id,
message.clone(),
prompt_arg,
session_id_parsed,
Some(chat_mode_config),
request.model.clone(),
)
.await
.map_err(|e| {
// Cleanup on error
err_cleanup_flag.store(false, std::sync::atomic::Ordering::SeqCst);
err_cleanup_guard.remove(&err_cleanup_session_id);
err_cleanup_cancel.remove(&err_cleanup_session_id);
format!("Failed to start streaming: {}", e)
})?;
(rx, driver)
};
let hb_state = heartbeat_state.inner().clone();
let rf_state = reflection_state.inner().clone();
// Clone the guard map for cleanup in the spawned task
let guard_map: SessionStreamGuard = stream_guard.inner().clone();
// Spawn a task to process stream events.
// The session_active flag is cleared when task completes.
let guard_clone = Arc::clone(&*session_active);
tokio::spawn(async move {
use zclaw_runtime::LoopEvent;
tracing::debug!("[agent_chat_stream] Starting stream processing for session: {}", session_id);
let stream_timeout = tokio::time::Duration::from_secs(300);
loop {
// === LoopEvent → StreamChatEvent mapping ===
//
// COMPLETENESS CHECKLIST: When adding a new LoopEvent variant, you MUST:
// 1. Add a match arm below
// 2. Add the corresponding StreamChatEvent variant (see struct defs above)
// 3. Add the TypeScript type in desktop/src/lib/kernel-types.ts
// 4. Add a handler in desktop/src/lib/kernel-chat.ts
//
// Current mapping (LoopEvent → StreamChatEvent):
// Delta → Delta
// ThinkingDelta → ThinkingDelta
// ToolStart → ToolStart / HandStart (if name starts with "hand_")
// ToolEnd → ToolEnd / HandEnd (if name starts with "hand_")
// SubtaskStatus → SubtaskStatus
// IterationStart → IterationStart
// Complete → Complete
// Error → Error
// ============================================
// Check cancellation flag before each recv
if cancel_clone.load(std::sync::atomic::Ordering::SeqCst) {
tracing::info!("[agent_chat_stream] Stream cancelled for session: {}", session_id);
if let Err(e) = app.emit("stream:chunk", serde_json::json!({
"sessionId": session_id,
"event": StreamChatEvent::Error { message: "已取消".to_string() }
})) {
tracing::debug!("[agent_chat_stream] Failed to emit cancel event: {}", e);
}
break;
}
match tokio::time::timeout(stream_timeout, rx.recv()).await {
Ok(Some(event)) => {
let stream_event = match &event {
LoopEvent::Delta(delta) => {
tracing::trace!("[agent_chat_stream] Delta: {} bytes", delta.len());
StreamChatEvent::Delta { delta: delta.clone() }
}
LoopEvent::ThinkingDelta(delta) => {
tracing::trace!("[agent_chat_stream] ThinkingDelta: {} bytes", delta.len());
StreamChatEvent::ThinkingDelta { delta: delta.clone() }
}
LoopEvent::ToolStart { name, input } => {
tracing::debug!("[agent_chat_stream] ToolStart: {}", name);
if name.starts_with("hand_") {
StreamChatEvent::HandStart { name: name.clone(), params: input.clone() }
} else {
StreamChatEvent::ToolStart { name: name.clone(), input: input.clone() }
}
}
LoopEvent::ToolEnd { name, output } => {
tracing::debug!("[agent_chat_stream] ToolEnd: {}", name);
if name.starts_with("hand_") {
StreamChatEvent::HandEnd { name: name.clone(), result: output.clone() }
} else {
StreamChatEvent::ToolEnd { name: name.clone(), output: output.clone() }
}
}
LoopEvent::SubtaskStatus { task_id, description, status, detail } => {
tracing::debug!("[agent_chat_stream] SubtaskStatus: {} - {} (id={})", description, status, task_id);
StreamChatEvent::SubtaskStatus {
task_id: task_id.clone(),
description: description.clone(),
status: status.clone(),
detail: detail.clone(),
}
}
LoopEvent::IterationStart { iteration, max_iterations } => {
tracing::debug!("[agent_chat_stream] IterationStart: {}/{}", iteration, max_iterations);
StreamChatEvent::IterationStart { iteration: *iteration, max_iterations: *max_iterations }
}
LoopEvent::Complete(result) => {
tracing::info!("[agent_chat_stream] Complete: input_tokens={}, output_tokens={}",
result.input_tokens, result.output_tokens);
let agent_id_hook = agent_id_str.clone();
let message_hook = message.clone();
let hb = hb_state.clone();
let rf = rf_state.clone();
let driver = llm_driver.clone();
tokio::spawn(async move {
crate::intelligence_hooks::post_conversation_hook(
&agent_id_hook, &message_hook, &hb, &rf, driver,
).await;
});
StreamChatEvent::Complete {
input_tokens: result.input_tokens,
output_tokens: result.output_tokens,
}
}
LoopEvent::Error(message) => {
tracing::warn!("[agent_chat_stream] Error: {}", message);
StreamChatEvent::Error { message: message.clone() }
}
};
if let Err(e) = app.emit("stream:chunk", serde_json::json!({
"sessionId": session_id,
"event": stream_event
})) {
tracing::warn!("[agent_chat_stream] Failed to emit event: {}", e);
break;
}
if matches!(event, LoopEvent::Complete(_) | LoopEvent::Error(_)) {
break;
}
}
Ok(None) => {
tracing::info!("[agent_chat_stream] Stream channel closed for session: {}", session_id);
break;
}
Err(_) => {
tracing::warn!("[agent_chat_stream] Stream idle timeout for session: {}", session_id);
if let Err(e) = app.emit("stream:chunk", serde_json::json!({
"sessionId": session_id,
"event": StreamChatEvent::Error {
message: "流式响应超时,请重试".to_string()
}
})) {
tracing::debug!("[agent_chat_stream] Failed to emit timeout event: {}", e);
}
break;
}
}
}
tracing::debug!("[agent_chat_stream] Stream processing ended for session: {}", session_id);
// Release session lock and clean up DashMap entries to prevent memory leaks.
// Use compare_exchange to only remove if we still own the flag (guards against
// a new stream for the same session_id starting after we broke out of the loop).
if guard_clone.compare_exchange(true, false, std::sync::atomic::Ordering::SeqCst, std::sync::atomic::Ordering::SeqCst).is_ok() {
guard_map.remove(&session_id);
}
// Clean up cancellation flag (always safe — cancel is session-scoped)
cancel_flags_map.remove(&session_id);
});
Ok(())
}
/// Cancel an active stream for a given session.
///
/// Sets the cancellation flag for the session, which the streaming task
/// checks on each iteration. The task will then emit an error event
/// and clean up.
// @connected
#[tauri::command]
pub async fn cancel_stream(
cancel_flags: State<'_, StreamCancelFlags>,
session_id: String,
) -> Result<(), String> {
if let Some(flag) = cancel_flags.get(&session_id) {
flag.store(true, std::sync::atomic::Ordering::SeqCst);
tracing::info!("[cancel_stream] Cancel requested for session: {}", session_id);
Ok(())
} else {
// No active stream for this session — not an error, just a no-op
tracing::debug!("[cancel_stream] No active stream for session: {}", session_id);
Ok(())
}
}