Files
zclaw_openfang/crates/zclaw-kernel/src/kernel/messaging.rs
iven ae7322e610
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
refactor(kernel,desktop): chat.rs 瘦身 Phase 2 — 548→458行 (-16%)
- 提取 translate_event() 函数: LoopEvent→StreamChatEvent 翻译独立
- 提取 Kernel::try_intercept_schedule(): 调度拦截下沉到 kernel
- 新增 ScheduleInterceptResult 类型导出
- 所有缝测试 14/14 PASS,无回归

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-21 22:25:10 +08:00

496 lines
23 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Message sending (non-streaming, streaming, system prompt building)
use tokio::sync::mpsc;
use zclaw_types::{AgentId, Result};
/// Chat mode configuration passed from the frontend.
/// Controls thinking, reasoning, plan mode, and sub-agent behavior.
#[derive(Debug, Clone)]
pub struct ChatModeConfig {
pub thinking_enabled: Option<bool>,
pub reasoning_effort: Option<String>,
pub plan_mode: Option<bool>,
pub subagent_enabled: Option<bool>,
}
/// Result of a successful schedule intent interception.
pub struct ScheduleInterceptResult {
/// Pre-built streaming receiver with confirmation message.
pub rx: mpsc::Receiver<zclaw_runtime::LoopEvent>,
/// Human-readable task description.
pub task_description: String,
/// Natural language description of the schedule.
pub natural_description: String,
/// Cron expression.
pub cron_expression: String,
}
impl Kernel {
/// Try to intercept a schedule intent from the user's message.
///
/// If the message contains a clear schedule intent (e.g., "每天早上9点提醒我查房"),
/// parse it, create a trigger, and return a streaming receiver with the
/// confirmation message. Returns `Ok(None)` if no interception occurred.
pub async fn try_intercept_schedule(
&self,
message: &str,
agent_id: &AgentId,
) -> Result<Option<ScheduleInterceptResult>> {
if !zclaw_runtime::nl_schedule::has_schedule_intent(message) {
return Ok(None);
}
let parse_result = zclaw_runtime::nl_schedule::parse_nl_schedule(message, agent_id);
match parse_result {
zclaw_runtime::nl_schedule::ScheduleParseResult::Exact(ref parsed)
if parsed.confidence >= 0.8 =>
{
let trigger_id = format!(
"sched_{}_{}",
chrono::Utc::now().timestamp_millis(),
&uuid::Uuid::new_v4().to_string()[..8]
);
let trigger_config = zclaw_hands::TriggerConfig {
id: trigger_id.clone(),
name: parsed.task_description.clone(),
hand_id: "_reminder".to_string(),
trigger_type: zclaw_hands::TriggerType::Schedule {
cron: parsed.cron_expression.clone(),
},
enabled: true,
max_executions_per_hour: 60,
};
match self.create_trigger(trigger_config).await {
Ok(_entry) => {
tracing::info!(
"[Kernel] Schedule trigger created: {} (cron: {})",
trigger_id, parsed.cron_expression
);
let confirm_msg = format!(
"已为您设置定时任务:\n\n- **任务**{}\n- **时间**{}\n- **Cron**`{}`\n\n任务已激活,将在设定时间自动执行。",
parsed.task_description,
parsed.natural_description,
parsed.cron_expression,
);
let (tx, rx) = mpsc::channel(32);
if tx.send(zclaw_runtime::LoopEvent::Delta(confirm_msg)).await.is_err() {
tracing::warn!("[Kernel] Failed to send confirm msg to channel");
}
if tx.send(zclaw_runtime::LoopEvent::Complete(
zclaw_runtime::AgentLoopResult {
response: String::new(),
input_tokens: 0,
output_tokens: 0,
iterations: 1,
}
)).await.is_err() {
tracing::warn!("[Kernel] Failed to send complete to channel");
}
drop(tx);
Ok(Some(ScheduleInterceptResult {
rx,
task_description: parsed.task_description.clone(),
natural_description: parsed.natural_description.clone(),
cron_expression: parsed.cron_expression.clone(),
}))
}
Err(e) => {
tracing::warn!(
"[Kernel] Failed to create schedule trigger, falling through to LLM: {}", e
);
Ok(None)
}
}
}
_ => {
tracing::debug!(
"[Kernel] Schedule intent detected but not confident enough, falling through to LLM"
);
Ok(None)
}
}
}
}
use zclaw_runtime::{AgentLoop, tool::builtin::PathValidator};
use super::Kernel;
use super::super::MessageResponse;
impl Kernel {
/// Send a message to an agent
pub async fn send_message(
&self,
agent_id: &AgentId,
message: String,
) -> Result<MessageResponse> {
self.send_message_with_chat_mode(agent_id, message, None, None).await
}
/// Send a message to an agent with optional chat mode configuration
pub async fn send_message_with_chat_mode(
&self,
agent_id: &AgentId,
message: String,
chat_mode: Option<ChatModeConfig>,
model_override: Option<String>,
) -> Result<MessageResponse> {
let agent_config = self.registry.get(agent_id)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(format!("Agent not found: {}", agent_id)))?;
// Create or get session
let session_id = self.memory.create_session(agent_id).await?;
// Model priority: UI override > Agent config > Global config
let model = model_override
.filter(|m| !m.is_empty())
.unwrap_or_else(|| {
if !agent_config.model.model.is_empty() {
agent_config.model.model.clone()
} else {
self.config.model().to_string()
}
});
// Create agent loop with model configuration
let subagent_enabled = chat_mode.as_ref().and_then(|m| m.subagent_enabled).unwrap_or(false);
let tools = self.create_tool_registry(subagent_enabled);
self.skill_executor.set_tool_registry(tools.clone());
let mut loop_runner = AgentLoop::new(
*agent_id,
self.driver.clone(),
tools,
self.memory.clone(),
)
.with_model(&model)
.with_skill_executor(self.skill_executor.clone())
.with_hand_executor(self.hand_executor.clone())
.with_max_tokens(agent_config.max_tokens.unwrap_or_else(|| self.config.max_tokens()))
.with_temperature(agent_config.temperature.unwrap_or_else(|| self.config.temperature()))
.with_compaction_threshold(
agent_config.compaction_threshold
.map(|t| t as usize)
.unwrap_or_else(|| self.config.compaction_threshold()),
);
// Set path validator from agent's workspace directory (if configured)
if let Some(ref workspace) = agent_config.workspace {
let path_validator = PathValidator::new().with_workspace(workspace.clone());
tracing::info!(
"[Kernel] Setting path_validator with workspace: {} for agent {}",
workspace.display(),
agent_id
);
loop_runner = loop_runner.with_path_validator(path_validator);
}
// Inject middleware chain
loop_runner = loop_runner.with_middleware_chain(self.create_middleware_chain());
// Apply chat mode configuration (thinking/reasoning/plan mode)
if let Some(ref mode) = chat_mode {
if mode.thinking_enabled.unwrap_or(false) {
loop_runner = loop_runner.with_thinking_enabled(true);
}
if let Some(ref effort) = mode.reasoning_effort {
loop_runner = loop_runner.with_reasoning_effort(effort.clone());
}
if mode.plan_mode.unwrap_or(false) {
loop_runner = loop_runner.with_plan_mode(true);
}
}
// Build system prompt with skill information injected
let system_prompt = self.build_system_prompt_with_skills(
agent_config.system_prompt.as_ref(),
subagent_enabled,
).await;
let loop_runner = loop_runner.with_system_prompt(&system_prompt);
// Run the loop
let result = loop_runner.run(session_id, message).await?;
// Track message count
self.registry.increment_message_count(agent_id);
Ok(MessageResponse {
content: result.response,
input_tokens: result.input_tokens,
output_tokens: result.output_tokens,
})
}
/// Send a message with streaming
pub async fn send_message_stream(
&self,
agent_id: &AgentId,
message: String,
) -> Result<mpsc::Receiver<zclaw_runtime::LoopEvent>> {
self.send_message_stream_with_prompt(agent_id, message, None, None, None, None).await
}
/// Send a message with streaming, optional system prompt, optional session reuse,
/// and optional chat mode configuration (thinking/reasoning/plan mode).
pub async fn send_message_stream_with_prompt(
&self,
agent_id: &AgentId,
message: String,
system_prompt_override: Option<String>,
session_id_override: Option<zclaw_types::SessionId>,
chat_mode: Option<ChatModeConfig>,
model_override: Option<String>,
) -> Result<mpsc::Receiver<zclaw_runtime::LoopEvent>> {
let agent_config = self.registry.get(agent_id)
.ok_or_else(|| zclaw_types::ZclawError::NotFound(format!("Agent not found: {}", agent_id)))?;
// Reuse existing session or create new one
let session_id = match session_id_override {
Some(id) => {
// Use get_or_create to ensure the frontend's session ID is persisted.
// This is the critical bridge: without it, the kernel generates a
// different UUID each turn, so conversation history is never found.
tracing::debug!("Reusing frontend session ID: {}", id);
self.memory.get_or_create_session(&id, agent_id).await?
}
None => self.memory.create_session(agent_id).await?,
};
// Model priority: UI override > Agent config > Global config
let model = model_override
.filter(|m| !m.is_empty())
.unwrap_or_else(|| {
if !agent_config.model.model.is_empty() {
agent_config.model.model.clone()
} else {
self.config.model().to_string()
}
});
// Create agent loop with model configuration
let subagent_enabled = chat_mode.as_ref().and_then(|m| m.subagent_enabled).unwrap_or(false);
let tools = self.create_tool_registry(subagent_enabled);
self.skill_executor.set_tool_registry(tools.clone());
let mut loop_runner = AgentLoop::new(
*agent_id,
self.driver.clone(),
tools,
self.memory.clone(),
)
.with_model(&model)
.with_skill_executor(self.skill_executor.clone())
.with_hand_executor(self.hand_executor.clone())
.with_max_tokens(agent_config.max_tokens.unwrap_or_else(|| self.config.max_tokens()))
.with_temperature(agent_config.temperature.unwrap_or_else(|| self.config.temperature()))
.with_compaction_threshold(
agent_config.compaction_threshold
.map(|t| t as usize)
.unwrap_or_else(|| self.config.compaction_threshold()),
);
// Set path validator from agent's workspace directory (if configured)
// This enables file_read / file_write tools to access the workspace
if let Some(ref workspace) = agent_config.workspace {
let path_validator = PathValidator::new().with_workspace(workspace.clone());
tracing::info!(
"[Kernel] Setting path_validator with workspace: {} for agent {}",
workspace.display(),
agent_id
);
loop_runner = loop_runner.with_path_validator(path_validator);
}
// Inject middleware chain
loop_runner = loop_runner.with_middleware_chain(self.create_middleware_chain());
// Apply chat mode configuration (thinking/reasoning/plan mode from frontend)
if let Some(ref mode) = chat_mode {
if mode.thinking_enabled.unwrap_or(false) {
loop_runner = loop_runner.with_thinking_enabled(true);
}
if let Some(ref effort) = mode.reasoning_effort {
loop_runner = loop_runner.with_reasoning_effort(effort.clone());
}
if mode.plan_mode.unwrap_or(false) {
loop_runner = loop_runner.with_plan_mode(true);
}
}
// Use external prompt if provided, otherwise build default
let system_prompt = match system_prompt_override {
Some(prompt) => prompt,
None => self.build_system_prompt_with_skills(
agent_config.system_prompt.as_ref(),
subagent_enabled,
).await,
};
let loop_runner = loop_runner.with_system_prompt(&system_prompt);
// Run with streaming
self.registry.increment_message_count(agent_id);
loop_runner.run_streaming(session_id, message).await
}
/// Build a system prompt with skill information injected.
/// When `subagent_enabled` is true, adds sub-agent delegation instructions.
pub(super) async fn build_system_prompt_with_skills(
&self,
base_prompt: Option<&String>,
subagent_enabled: bool,
) -> String {
// Get skill list asynchronously
let skills = self.skills.list().await;
let mut prompt = base_prompt
.map(|p| p.clone())
.unwrap_or_else(|| "You are a helpful AI assistant.".to_string());
// Progressive skill loading (DeerFlow pattern):
// If the SkillIndexMiddleware is registered in the middleware chain,
// it will inject a lightweight index at priority 200.
// We still inject a basic instruction block here for when middleware is not active.
//
// When middleware IS active, avoid duplicate injection by only keeping
// the skill-use instructions (not the full list).
let skill_index_active = {
use zclaw_runtime::tool::SkillExecutor;
!self.skill_executor.list_skill_index().is_empty()
};
if !skills.is_empty() {
if skill_index_active {
// Middleware will inject the index — only add usage instructions
prompt.push_str("\n\n## Skills\n\n");
prompt.push_str("You have access to specialized skills listed in the skill index above. ");
prompt.push_str("Analyze user intent and autonomously call `skill_load` to inspect a skill, ");
prompt.push_str("then `execute_skill` with the appropriate skill_id.\n\n");
prompt.push_str("- **IMPORTANT**: Autonomously decide when to use skills based on user intent.\n");
prompt.push_str("- Do not wait for explicit skill names — recognize the need and act.\n");
prompt.push_str("- If unsure about a skill, call `skill_load` first to understand its parameters.\n");
} else {
// No middleware — inject full skill list as fallback
prompt.push_str("\n\n## Available Skills\n\n");
prompt.push_str("You have access to specialized skills. Analyze user intent and autonomously call `execute_skill` with the appropriate skill_id.\n\n");
let categories = self.categorize_skills(&skills);
for (category, category_skills) in categories {
prompt.push_str(&format!("### {}\n", category));
for skill in category_skills {
prompt.push_str(&format!(
"- **{}**: {}",
skill.id.as_str(),
skill.description
));
prompt.push('\n');
}
prompt.push('\n');
}
prompt.push_str("### When to use skills:\n");
prompt.push_str("- **IMPORTANT**: You should autonomously decide when to use skills based on your understanding of the user's intent.\n");
prompt.push_str("- Do not wait for explicit skill names - recognize the need and act.\n");
prompt.push_str("- Match user's request to the most appropriate skill's domain.\n\n");
prompt.push_str("### Example:\n");
prompt.push_str("User: 分析腾讯财报 -> Intent: Financial analysis -> Call: execute_skill(\"finance-tracker\", {...})\n");
}
}
// Sub-agent delegation instructions (Ultra mode only)
if subagent_enabled {
prompt.push_str("\n\n## Sub-Agent Delegation\n\n");
prompt.push_str("You can delegate complex sub-tasks to sub-agents using the `task` tool. This enables parallel execution of independent work.\n\n");
prompt.push_str("### When to use sub-agents:\n");
prompt.push_str("- Complex tasks that can be decomposed into independent parallel sub-tasks\n");
prompt.push_str("- Research tasks requiring multiple independent searches\n");
prompt.push_str("- Tasks requiring different expertise areas simultaneously\n\n");
prompt.push_str("### Guidelines:\n");
prompt.push_str("- Break complex work into clear, self-contained sub-tasks\n");
prompt.push_str("- Each sub-task should have a clear objective and expected output\n");
prompt.push_str("- Synthesize sub-agent results into a coherent final response\n");
prompt.push_str("- Maximum 3 concurrent sub-agents — batch if more are needed\n");
}
// Clarification system — always enabled
prompt.push_str("\n\n## Clarification System\n\n");
prompt.push_str("When you encounter any of the following situations, call `ask_clarification` to ask the user BEFORE proceeding:\n\n");
prompt.push_str("- **Missing information**: User's request is critical details you you need but don't have\n");
prompt.push_str("- **Ambiguous requirement**: Multiple valid interpretations exist\n");
prompt.push_str("- **Approach choice**: Several approaches with different trade-offs\n");
prompt.push_str("- **Risk confirmation**: Action could have significant consequences\n\n");
prompt.push_str("### Guidelines:\n");
prompt.push_str("- ALWAYS prefer asking over guessing\n");
prompt.push_str("- Provide clear options when possible\n");
prompt.push_str("- Include brief context about why you're asking\n");
prompt.push_str("- After receiving clarification, proceed immediately\n");
prompt
}
/// Categorize skills into logical groups
///
/// Priority:
/// 1. Use skill's `category` field if defined in SKILL.md
/// 2. Fall back to pattern matching for backward compatibility
pub(super) fn categorize_skills<'a>(&self, skills: &'a [zclaw_skills::SkillManifest]) -> Vec<(String, Vec<&'a zclaw_skills::SkillManifest>)> {
let mut categories: std::collections::HashMap<String, Vec<&zclaw_skills::SkillManifest>> = std::collections::HashMap::new();
// Fallback category patterns for skills without explicit category
let fallback_patterns = [
("开发工程", vec!["senior-developer", "frontend-developer", "backend-architect", "ai-engineer", "devops-automator", "rapid-prototyper", "lsp-index-engineer"]),
("测试质量", vec!["api-tester", "evidence-collector", "reality-checker", "performance-benchmarker", "test-results-analyzer", "accessibility-auditor", "code-review"]),
("安全合规", vec!["security-engineer", "legal-compliance-checker", "agentic-identity-trust"]),
("数据分析", vec!["analytics-reporter", "finance-tracker", "data-analysis", "sales-data-extraction-agent", "data-consolidation-agent", "report-distribution-agent"]),
("项目管理", vec!["senior-pm", "project-shepherd", "sprint-prioritizer", "experiment-tracker", "feedback-synthesizer", "trend-researcher", "agents-orchestrator"]),
("设计UX", vec!["ui-designer", "ux-architect", "ux-researcher", "visual-storyteller", "image-prompt-engineer", "whimsy-injector", "brand-guardian"]),
("内容营销", vec!["content-creator", "chinese-writing", "executive-summary-generator", "social-media-strategist"]),
("社交平台", vec!["twitter-engager", "instagram-curator", "tiktok-strategist", "reddit-community-builder", "zhihu-strategist", "xiaohongshu-specialist", "wechat-official-account", "growth-hacker", "app-store-optimizer"]),
("运营支持", vec!["studio-operations", "studio-producer", "support-responder", "workflow-optimizer", "infrastructure-maintainer", "tool-evaluator"]),
("XR/空间计算", vec!["visionos-spatial-engineer", "macos-spatial-metal-engineer", "xr-immersive-developer", "xr-interface-architect", "xr-cockpit-interaction-specialist", "terminal-integration-specialist"]),
("基础工具", vec!["web-search", "file-operations", "shell-command", "git", "translation", "feishu-docs"]),
];
// Categorize each skill
for skill in skills {
// Priority 1: Use skill's explicit category
if let Some(ref category) = skill.category {
if !category.is_empty() {
categories.entry(category.clone()).or_default().push(skill);
continue;
}
}
// Priority 2: Fallback to pattern matching
let skill_id = skill.id.as_str();
let mut categorized = false;
for (category, patterns) in &fallback_patterns {
if patterns.iter().any(|p| skill_id.contains(p) || *p == skill_id) {
categories.entry(category.to_string()).or_default().push(skill);
categorized = true;
break;
}
}
// Put uncategorized skills in "其他"
if !categorized {
categories.entry("其他".to_string()).or_default().push(skill);
}
}
// Convert to ordered vector
let mut result: Vec<(String, Vec<_>)> = categories.into_iter().collect();
result.sort_by(|a, b| {
// Sort by predefined order
let order = ["开发工程", "测试质量", "安全合规", "数据分析", "项目管理", "设计UX", "内容营销", "社交平台", "运营支持", "XR/空间计算", "基础工具", "其他"];
let a_idx = order.iter().position(|&x| x == a.0).unwrap_or(99);
let b_idx = order.iter().position(|&x| x == b.0).unwrap_or(99);
a_idx.cmp(&b_idx)
});
result
}
}