Files
zclaw_openfang/crates/zclaw-kernel/src/director.rs
iven f3fb5340b5 fix: 发布前审计 Batch 1 — Pipeline 内存泄漏/超时 + Director 死锁 + Rate Limit Worker
Pipeline executor:
- 添加 cleanup() 方法,MAX_COMPLETED_RUNS=100 上限淘汰旧记录
- 每步执行添加 tokio::time::timeout(使用 PipelineSpec.timeout_secs,默认 300s)
- Delay ms 上限 60000,超出 warn 并截断

Director send_to_agent:
- 重构为 oneshot::channel 响应模式,避免 inbox + pending_requests 锁竞争
- 添加 ensure_inbox_reader() 独立任务分发响应到对应 oneshot sender

cleanup_rate_limit Worker:
- 实现 Worker body: DELETE FROM rate_limit_events WHERE created_at < NOW() - INTERVAL '1 hour'

651 tests passed, 0 failed
2026-04-18 14:09:16 +08:00

1262 lines
42 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! Director - Multi-Agent Orchestration (Experimental)
//!
//! The Director manages multi-agent conversations by:
//! - Determining which agent speaks next
//! - Managing conversation state and turn order
//! - Supporting multiple scheduling strategies
//! - Coordinating agent responses
//!
//! **Status**: This module is enabled by default via the `multi-agent` feature in the
//! desktop build. The Director orchestrates butler delegation, task decomposition, and
//! expert agent assignment through `butler_delegate()`.
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tokio::sync::{RwLock, Mutex, mpsc, oneshot};
use zclaw_types::{AgentId, Result, ZclawError};
use zclaw_protocols::{A2aEnvelope, A2aMessageType, A2aRecipient, A2aRouter, A2aAgentProfile, A2aCapability};
use zclaw_runtime::{LlmDriver, CompletionRequest};
/// Director configuration
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DirectorConfig {
/// Maximum turns before ending conversation
pub max_turns: usize,
/// Scheduling strategy
pub strategy: ScheduleStrategy,
/// Whether to include user in the loop
pub include_user: bool,
/// Timeout for agent response (seconds)
pub response_timeout: u64,
/// Whether to allow parallel agent responses
pub allow_parallel: bool,
}
impl Default for DirectorConfig {
fn default() -> Self {
Self {
max_turns: 50,
strategy: ScheduleStrategy::Priority,
include_user: true,
response_timeout: 30,
allow_parallel: false,
}
}
}
/// Scheduling strategy for determining next speaker
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ScheduleStrategy {
/// Round-robin through all agents
RoundRobin,
/// Priority-based selection (higher priority speaks first)
Priority,
/// LLM decides who speaks next
LlmDecision,
/// Random selection
Random,
/// Manual (external controller decides)
Manual,
}
/// Agent role in the conversation
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "snake_case")]
pub enum AgentRole {
/// Main teacher/instructor
Teacher,
/// Teaching assistant
Assistant,
/// Student participant
Student,
/// Moderator/facilitator
Moderator,
/// Expert consultant
Expert,
/// Observer (receives messages but doesn't speak)
Observer,
}
impl AgentRole {
/// Get default priority for this role
pub fn default_priority(&self) -> u8 {
match self {
AgentRole::Teacher => 10,
AgentRole::Moderator => 9,
AgentRole::Expert => 8,
AgentRole::Assistant => 7,
AgentRole::Student => 5,
AgentRole::Observer => 0,
}
}
}
/// Agent configuration for director
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DirectorAgent {
/// Agent ID
pub id: AgentId,
/// Display name
pub name: String,
/// Agent role
pub role: AgentRole,
/// Priority (higher = speaks first)
pub priority: u8,
/// System prompt / persona
pub persona: String,
/// Whether this agent is active
pub active: bool,
/// Maximum turns this agent can speak consecutively
pub max_consecutive_turns: usize,
}
impl DirectorAgent {
/// Create a new director agent
pub fn new(id: AgentId, name: impl Into<String>, role: AgentRole, persona: impl Into<String>) -> Self {
let priority = role.default_priority();
Self {
id,
name: name.into(),
role,
priority,
persona: persona.into(),
active: true,
max_consecutive_turns: 2,
}
}
}
/// Conversation state
#[derive(Debug, Clone, Default)]
pub struct ConversationState {
/// Current turn number
pub turn: usize,
/// Current speaker ID
pub current_speaker: Option<AgentId>,
/// Turn history (agent_id, message_summary)
pub history: Vec<(AgentId, String)>,
/// Consecutive turns by current agent
pub consecutive_turns: usize,
/// Whether conversation is active
pub active: bool,
/// Conversation topic/goal
pub topic: Option<String>,
}
impl ConversationState {
/// Create new conversation state
pub fn new() -> Self {
Self {
turn: 0,
current_speaker: None,
history: Vec::new(),
consecutive_turns: 0,
active: false,
topic: None,
}
}
/// Record a turn
pub fn record_turn(&mut self, agent_id: AgentId, summary: String) {
if self.current_speaker == Some(agent_id) {
self.consecutive_turns += 1;
} else {
self.consecutive_turns = 1;
self.current_speaker = Some(agent_id);
}
self.history.push((agent_id, summary));
self.turn += 1;
}
/// Get last N turns
pub fn get_recent_history(&self, n: usize) -> &[(AgentId, String)] {
let start = self.history.len().saturating_sub(n);
&self.history[start..]
}
/// Check if agent has spoken too many consecutive turns
pub fn is_over_consecutive_limit(&self, agent_id: &AgentId, max: usize) -> bool {
if self.current_speaker == Some(*agent_id) {
self.consecutive_turns >= max
} else {
false
}
}
}
/// The Director orchestrates multi-agent conversations
pub struct Director {
/// Director configuration
config: DirectorConfig,
/// Registered agents
agents: Arc<RwLock<Vec<DirectorAgent>>>,
/// Conversation state
state: Arc<RwLock<ConversationState>>,
/// A2A router for messaging
router: Arc<A2aRouter>,
/// Agent ID for the director itself
director_id: AgentId,
/// Optional LLM driver for intelligent scheduling
llm_driver: Option<Arc<dyn LlmDriver>>,
/// Pending request response channels (request_id → oneshot sender)
pending_requests: Arc<Mutex<std::collections::HashMap<String, oneshot::Sender<A2aEnvelope>>>>,
/// Receiver for incoming messages (consumed by inbox reader task)
inbox: Arc<Mutex<Option<mpsc::Receiver<A2aEnvelope>>>>,
}
impl Director {
/// Create a new director
pub fn new(config: DirectorConfig) -> Self {
let director_id = AgentId::new();
let router = Arc::new(A2aRouter::new(director_id.clone()));
Self {
config,
agents: Arc::new(RwLock::new(Vec::new())),
state: Arc::new(RwLock::new(ConversationState::new())),
router,
director_id,
llm_driver: None,
pending_requests: Arc::new(Mutex::new(std::collections::HashMap::new())),
inbox: Arc::new(Mutex::new(None)),
}
}
/// Create director with existing router
pub fn with_router(config: DirectorConfig, router: Arc<A2aRouter>) -> Self {
let director_id = AgentId::new();
Self {
config,
agents: Arc::new(RwLock::new(Vec::new())),
state: Arc::new(RwLock::new(ConversationState::new())),
router,
director_id,
llm_driver: None,
pending_requests: Arc::new(Mutex::new(std::collections::HashMap::new())),
inbox: Arc::new(Mutex::new(None)),
}
}
/// Initialize the director's inbox (must be called after creation)
pub async fn initialize(&self) -> Result<()> {
let profile = A2aAgentProfile {
id: self.director_id.clone(),
name: "Director".to_string(),
description: "Multi-agent conversation orchestrator".to_string(),
capabilities: vec![A2aCapability {
name: "orchestration".to_string(),
description: "Multi-agent conversation management".to_string(),
input_schema: None,
output_schema: None,
requires_approval: false,
version: "1.0.0".to_string(),
tags: vec!["orchestration".to_string()],
}],
protocols: vec!["a2a".to_string()],
role: "orchestrator".to_string(),
priority: 10,
metadata: Default::default(),
groups: vec![],
last_seen: 0,
};
let rx = self.router.register_agent(profile).await;
*self.inbox.lock().await = Some(rx);
Ok(())
}
/// Set LLM driver for intelligent scheduling
pub fn with_llm_driver(mut self, driver: Arc<dyn LlmDriver>) -> Self {
self.llm_driver = Some(driver);
self
}
/// Set LLM driver (mutable)
pub fn set_llm_driver(&mut self, driver: Arc<dyn LlmDriver>) {
self.llm_driver = Some(driver);
}
/// Register an agent
pub async fn register_agent(&self, agent: DirectorAgent) {
let mut agents = self.agents.write().await;
agents.push(agent);
// Sort by priority (descending)
agents.sort_by(|a, b| b.priority.cmp(&a.priority));
}
/// Remove an agent
pub async fn remove_agent(&self, agent_id: &AgentId) {
let mut agents = self.agents.write().await;
agents.retain(|a| &a.id != agent_id);
}
/// Get all registered agents
pub async fn get_agents(&self) -> Vec<DirectorAgent> {
self.agents.read().await.clone()
}
/// Get active agents sorted by priority
pub async fn get_active_agents(&self) -> Vec<DirectorAgent> {
self.agents
.read()
.await
.iter()
.filter(|a| a.active)
.cloned()
.collect()
}
/// Start a new conversation
pub async fn start_conversation(&self, topic: Option<String>) {
let mut state = self.state.write().await;
state.turn = 0;
state.current_speaker = None;
state.history.clear();
state.consecutive_turns = 0;
state.active = true;
state.topic = topic;
}
/// End the conversation
pub async fn end_conversation(&self) {
let mut state = self.state.write().await;
state.active = false;
}
/// Get current conversation state
pub async fn get_state(&self) -> ConversationState {
self.state.read().await.clone()
}
/// Select the next speaker based on strategy
pub async fn select_next_speaker(&self) -> Option<DirectorAgent> {
let agents = self.get_active_agents().await;
let state = self.state.read().await;
if agents.is_empty() || state.turn >= self.config.max_turns {
return None;
}
match self.config.strategy {
ScheduleStrategy::RoundRobin => {
// Round-robin through active agents
let idx = state.turn % agents.len();
Some(agents[idx].clone())
}
ScheduleStrategy::Priority => {
// Select highest priority agent that hasn't exceeded consecutive limit
for agent in &agents {
if !state.is_over_consecutive_limit(&agent.id, agent.max_consecutive_turns) {
return Some(agent.clone());
}
}
// If all exceeded, pick the highest priority anyway
agents.first().cloned()
}
ScheduleStrategy::Random => {
// Random selection
use std::time::{SystemTime, UNIX_EPOCH};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
let idx = (now as usize) % agents.len();
Some(agents[idx].clone())
}
ScheduleStrategy::LlmDecision => {
// LLM-based decision making
self.select_speaker_with_llm(&agents, &state).await
.or_else(|| agents.first().cloned())
}
ScheduleStrategy::Manual => {
// External controller decides
None
}
}
}
/// Use LLM to select the next speaker
async fn select_speaker_with_llm(
&self,
agents: &[DirectorAgent],
state: &ConversationState,
) -> Option<DirectorAgent> {
let driver = self.llm_driver.as_ref()?;
// Build context for LLM decision
let agent_descriptions: String = agents
.iter()
.enumerate()
.map(|(i, a)| format!("{}. {} ({}) - {}", i + 1, a.name, a.role.as_str(), a.persona))
.collect::<Vec<_>>()
.join("\n");
let recent_history: String = state
.get_recent_history(5)
.iter()
.map(|(id, msg)| {
let agent = agents.iter().find(|a| &a.id == id);
let name = agent.map(|a| a.name.as_str()).unwrap_or("Unknown");
format!("- {}: {}", name, msg)
})
.collect::<Vec<_>>()
.join("\n");
let topic = state.topic.as_deref().unwrap_or("General discussion");
let prompt = format!(
r#"You are a conversation director. Select the best agent to speak next.
Topic: {}
Available Agents:
{}
Recent Conversation:
{}
Current turn: {}
Last speaker: {}
Instructions:
1. Consider the conversation flow and topic
2. Choose the agent who should speak next to advance the conversation
3. Avoid having the same agent speak too many times consecutively
4. Consider which role would be most valuable at this point
Respond with ONLY the number (1-{}) of the agent who should speak next. No explanation."#,
topic,
agent_descriptions,
recent_history,
state.turn,
state.current_speaker
.and_then(|id| agents.iter().find(|a| a.id == id))
.map(|a| &a.name)
.unwrap_or(&"None".to_string()),
agents.len()
);
let request = CompletionRequest {
model: "default".to_string(),
system: Some("You are a conversation director. You respond with only a single number.".to_string()),
messages: vec![zclaw_types::Message::User { content: prompt }],
tools: vec![],
max_tokens: Some(10),
temperature: Some(0.3),
stop: vec![],
stream: false,
thinking_enabled: false,
reasoning_effort: None,
plan_mode: false,
};
match driver.complete(request).await {
Ok(response) => {
// Extract text from response
let text = response.content.iter()
.filter_map(|block| match block {
zclaw_runtime::ContentBlock::Text { text } => Some(text.clone()),
_ => None,
})
.collect::<Vec<_>>()
.join("");
// Parse the number
if let Ok(idx) = text.trim().parse::<usize>() {
if idx >= 1 && idx <= agents.len() {
return Some(agents[idx - 1].clone());
}
}
// Fallback to first agent
agents.first().cloned()
}
Err(e) => {
tracing::warn!("LLM speaker selection failed: {}", e);
agents.first().cloned()
}
}
}
/// Send message to selected agent and wait for response
///
/// Uses oneshot channels to avoid deadlock: each call creates its own
/// response channel, and a shared inbox reader dispatches responses.
pub async fn send_to_agent(
&self,
agent: &DirectorAgent,
message: String,
) -> Result<String> {
// Create a oneshot channel for this specific request's response
let (response_tx, response_rx) = oneshot::channel::<A2aEnvelope>();
let envelope = A2aEnvelope::new(
self.director_id.clone(),
A2aRecipient::Direct { agent_id: agent.id.clone() },
A2aMessageType::Request,
serde_json::json!({
"message": message,
"persona": agent.persona.clone(),
"role": agent.role.clone(),
}),
);
// Store the oneshot sender so the inbox reader can dispatch to it
let request_id = envelope.id.clone();
{
let mut pending = self.pending_requests.lock().await;
pending.insert(request_id.clone(), response_tx);
}
// Send the request
self.router.route(envelope).await?;
// Ensure the inbox reader is running
self.ensure_inbox_reader().await;
// Wait for response on our dedicated oneshot channel with timeout
let timeout_duration = std::time::Duration::from_secs(self.config.response_timeout);
let response = tokio::time::timeout(timeout_duration, response_rx).await;
// Clean up pending request (sender already consumed on success)
{
let mut pending = self.pending_requests.lock().await;
pending.remove(&request_id);
}
match response {
Ok(Ok(envelope)) => {
let response_text = envelope.payload
.get("response")
.and_then(|v: &serde_json::Value| v.as_str())
.unwrap_or(&format!("[{}] Response from {}", agent.role.as_str(), agent.name))
.to_string();
Ok(response_text)
}
Ok(Err(_)) => {
Err(ZclawError::Timeout("No response received".into()))
}
Err(_) => {
Err(ZclawError::Timeout(format!(
"Agent {} did not respond within {} seconds",
agent.name, self.config.response_timeout
)))
}
}
}
/// Ensure the inbox reader task is running.
/// The inbox reader continuously reads from the shared inbox channel
/// and dispatches each response to the correct oneshot sender.
async fn ensure_inbox_reader(&self) {
// Quick check: if inbox has already been taken, reader is running
{
let inbox = self.inbox.lock().await;
if inbox.is_none() {
return; // Reader already spawned and consumed the receiver
}
}
// Take the receiver out (only once)
let rx = {
let mut inbox = self.inbox.lock().await;
inbox.take()
};
if let Some(mut rx) = rx {
let pending = self.pending_requests.clone();
tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
// Find and dispatch to the correct oneshot sender
if msg.message_type == A2aMessageType::Response {
if let Some(ref reply_to) = msg.reply_to {
let mut pending_guard = pending.lock().await;
if let Some(sender) = pending_guard.remove(reply_to) {
// Send the response; if receiver already dropped, that's fine
let _ = sender.send(msg);
}
}
}
// Non-response messages are dropped (notifications, etc.)
}
});
}
}
/// Broadcast message to all agents
pub async fn broadcast(&self, message: String) -> Result<()> {
let envelope = A2aEnvelope::new(
self.director_id,
A2aRecipient::Broadcast,
A2aMessageType::Notification,
serde_json::json!({ "message": message }),
);
self.router.route(envelope).await
}
/// Run one turn of the conversation
pub async fn run_turn(&self, input: Option<String>) -> Result<Option<DirectorAgent>> {
let state = self.state.read().await;
if !state.active {
return Err(ZclawError::InvalidInput("Conversation not active".into()));
}
drop(state);
// Select next speaker
let speaker = self.select_next_speaker().await;
if let Some(ref agent) = speaker {
// Build context from recent history
let state = self.state.read().await;
let context = Self::build_context(&state, &input);
// Send message to agent
let response = self.send_to_agent(agent, context).await?;
// Update state
let mut state = self.state.write().await;
let summary = if response.len() > 100 {
format!("{}...", &response[..100])
} else {
response
};
state.record_turn(agent.id, summary);
}
Ok(speaker)
}
/// Build context string for agent
fn build_context(state: &ConversationState, input: &Option<String>) -> String {
let mut context = String::new();
if let Some(ref topic) = state.topic {
context.push_str(&format!("Topic: {}\n\n", topic));
}
if let Some(ref user_input) = input {
context.push_str(&format!("User: {}\n\n", user_input));
}
// Add recent history
if !state.history.is_empty() {
context.push_str("Recent conversation:\n");
for (agent_id, summary) in state.get_recent_history(5) {
context.push_str(&format!("- {}: {}\n", agent_id, summary));
}
}
context
}
/// Run full conversation until complete
pub async fn run_conversation(
&self,
topic: String,
initial_input: Option<String>,
) -> Result<Vec<(AgentId, String)>> {
self.start_conversation(Some(topic.clone())).await;
let mut input = initial_input;
let mut results = Vec::new();
loop {
let state = self.state.read().await;
// Check termination conditions
if state.turn >= self.config.max_turns {
break;
}
if !state.active {
break;
}
drop(state);
// Run one turn
match self.run_turn(input.take()).await {
Ok(Some(_agent)) => {
let state = self.state.read().await;
if let Some((agent_id, summary)) = state.history.last() {
results.push((*agent_id, summary.clone()));
}
}
Ok(None) => {
// Manual mode or no speaker selected
break;
}
Err(e) => {
tracing::error!("Turn error: {}", e);
break;
}
}
// In a real implementation, we would wait for user input here
// if config.include_user is true
}
self.end_conversation().await;
Ok(results)
}
/// Get the director's agent ID
pub fn director_id(&self) -> &AgentId {
&self.director_id
}
}
impl AgentRole {
/// Get role as string
pub fn as_str(&self) -> &'static str {
match self {
AgentRole::Teacher => "teacher",
AgentRole::Assistant => "assistant",
AgentRole::Student => "student",
AgentRole::Moderator => "moderator",
AgentRole::Expert => "expert",
AgentRole::Observer => "observer",
}
}
/// Parse role from string
pub fn from_str(s: &str) -> Option<Self> {
match s.to_lowercase().as_str() {
"teacher" | "instructor" => Some(AgentRole::Teacher),
"assistant" | "ta" => Some(AgentRole::Assistant),
"student" => Some(AgentRole::Student),
"moderator" | "facilitator" => Some(AgentRole::Moderator),
"expert" | "consultant" => Some(AgentRole::Expert),
"observer" => Some(AgentRole::Observer),
_ => None,
}
}
}
/// Builder for creating director configurations
pub struct DirectorBuilder {
config: DirectorConfig,
agents: Vec<DirectorAgent>,
}
impl DirectorBuilder {
/// Create a new builder
pub fn new() -> Self {
Self {
config: DirectorConfig::default(),
agents: Vec::new(),
}
}
/// Set scheduling strategy
pub fn strategy(mut self, strategy: ScheduleStrategy) -> Self {
self.config.strategy = strategy;
self
}
/// Set max turns
pub fn max_turns(mut self, max_turns: usize) -> Self {
self.config.max_turns = max_turns;
self
}
/// Include user in conversation
pub fn include_user(mut self, include: bool) -> Self {
self.config.include_user = include;
self
}
/// Add a teacher agent
pub fn teacher(mut self, id: AgentId, name: impl Into<String>, persona: impl Into<String>) -> Self {
let mut agent = DirectorAgent::new(id, name, AgentRole::Teacher, persona);
agent.priority = 10;
self.agents.push(agent);
self
}
/// Add an assistant agent
pub fn assistant(mut self, id: AgentId, name: impl Into<String>, persona: impl Into<String>) -> Self {
let mut agent = DirectorAgent::new(id, name, AgentRole::Assistant, persona);
agent.priority = 7;
self.agents.push(agent);
self
}
/// Add a student agent
pub fn student(mut self, id: AgentId, name: impl Into<String>, persona: impl Into<String>) -> Self {
let mut agent = DirectorAgent::new(id, name, AgentRole::Student, persona);
agent.priority = 5;
self.agents.push(agent);
self
}
/// Add a custom agent
pub fn agent(mut self, agent: DirectorAgent) -> Self {
self.agents.push(agent);
self
}
/// Build the director
pub async fn build(self) -> Director {
let director = Director::new(self.config);
for agent in self.agents {
director.register_agent(agent).await;
}
director
}
}
impl Default for DirectorBuilder {
fn default() -> Self {
Self::new()
}
}
// ---------------------------------------------------------------------------
// Butler delegation — task decomposition and expert assignment
// ---------------------------------------------------------------------------
/// A task assigned to an expert agent by the butler.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExpertTask {
/// Unique task ID
pub id: String,
/// The sub-task description
pub description: String,
/// Assigned expert agent (if any)
pub assigned_expert: Option<DirectorAgent>,
/// Task category (logistics, compliance, customer, pricing, technology, general)
pub category: String,
/// Task priority (higher = more urgent)
pub priority: u8,
/// Current status
pub status: ExpertTaskStatus,
}
/// Status of an expert task.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum ExpertTaskStatus {
#[default]
Pending,
Assigned,
InProgress,
Completed,
Failed,
}
/// Result of butler delegation.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DelegationResult {
/// Original user request
pub request: String,
/// Decomposed sub-tasks with expert assignments
pub tasks: Vec<ExpertTask>,
/// Whether delegation was successful
pub success: bool,
/// Summary message for the user
pub summary: String,
}
impl Director {
/// Butler receives a user request, decomposes it into sub-tasks,
/// and assigns each to the best-matching registered expert agent.
///
/// If no LLM driver is available, falls back to rule-based decomposition.
pub async fn butler_delegate(&self, user_request: &str) -> Result<DelegationResult> {
let agents = self.get_active_agents().await;
// Decompose the request into sub-tasks
let subtasks = if self.llm_driver.is_some() {
self.decompose_with_llm(user_request).await?
} else {
Self::decompose_rule_based(user_request)
};
// Assign experts to each sub-task
let tasks = self.assign_experts(&subtasks, &agents).await;
let summary = format!(
"已将您的需求拆解为 {} 个子任务{}",
tasks.len(),
if tasks.iter().any(|t| t.assigned_expert.is_some()) {
",已分派给对应专家"
} else {
""
}
);
Ok(DelegationResult {
request: user_request.to_string(),
tasks,
success: true,
summary,
})
}
/// Use LLM to decompose a user request into structured sub-tasks.
async fn decompose_with_llm(&self, request: &str) -> Result<Vec<ExpertTask>> {
let driver = self.llm_driver.as_ref()
.ok_or_else(|| ZclawError::InvalidInput("No LLM driver configured".into()))?;
let prompt = format!(
r#"你是 ZCLAW 管家。请将以下用户需求拆解为 1-5 个具体子任务。
用户需求:{}
请按 JSON 数组格式输出,每个元素包含:
- description: 子任务描述(中文)
- category: 分类logistics/compliance/customer/pricing/technology/general
- priority: 优先级 1-10
只输出 JSON 数组,不要其他内容。"#,
request
);
let completion_request = CompletionRequest {
model: "default".to_string(),
system: Some("你是任务拆解专家,只输出 JSON。".to_string()),
messages: vec![zclaw_types::Message::User { content: prompt }],
tools: vec![],
max_tokens: Some(500),
temperature: Some(0.3),
stop: vec![],
stream: false,
thinking_enabled: false,
reasoning_effort: None,
plan_mode: false,
};
match driver.complete(completion_request).await {
Ok(response) => {
let text: String = response.content.iter()
.filter_map(|block| match block {
zclaw_runtime::ContentBlock::Text { text } => Some(text.as_str()),
_ => None,
})
.collect::<Vec<_>>()
.join("");
// Try to extract JSON array from response
let json_text = extract_json_array(&text);
match serde_json::from_str::<Vec<serde_json::Value>>(&json_text) {
Ok(items) => {
let tasks: Vec<ExpertTask> = items.into_iter().map(|item| {
ExpertTask {
id: uuid::Uuid::new_v4().to_string(),
description: item.get("description")
.and_then(|v| v.as_str())
.unwrap_or("未命名任务")
.to_string(),
assigned_expert: None,
category: item.get("category")
.and_then(|v| v.as_str())
.unwrap_or("general")
.to_string(),
priority: item.get("priority")
.and_then(|v| v.as_u64())
.unwrap_or(5) as u8,
status: ExpertTaskStatus::Pending,
}
}).collect();
Ok(tasks)
}
Err(_) => {
// Fallback: treat the whole request as one task
Ok(vec![ExpertTask {
id: uuid::Uuid::new_v4().to_string(),
description: request.to_string(),
assigned_expert: None,
category: "general".to_string(),
priority: 5,
status: ExpertTaskStatus::Pending,
}])
}
}
}
Err(e) => {
tracing::warn!("LLM decomposition failed: {}, falling back to rule-based", e);
Ok(Self::decompose_rule_based(request))
}
}
}
/// Rule-based decomposition for when no LLM is available.
fn decompose_rule_based(request: &str) -> Vec<ExpertTask> {
let category = classify_delegation_category(request);
vec![ExpertTask {
id: uuid::Uuid::new_v4().to_string(),
description: request.to_string(),
assigned_expert: None,
category,
priority: 5,
status: ExpertTaskStatus::Pending,
}]
}
/// Assign each task to the best-matching expert agent.
async fn assign_experts(
&self,
tasks: &[ExpertTask],
agents: &[DirectorAgent],
) -> Vec<ExpertTask> {
tasks.iter().map(|task| {
let best_match = agents.iter().find(|agent| {
agent.role == AgentRole::Expert
&& agent.persona.to_lowercase().contains(&task.category.to_lowercase())
}).or_else(|| {
// Fallback: find any expert
agents.iter().find(|agent| agent.role == AgentRole::Expert)
});
let mut assigned = task.clone();
if let Some(expert) = best_match {
assigned.assigned_expert = Some(expert.clone());
assigned.status = ExpertTaskStatus::Assigned;
}
assigned
}).collect()
}
}
/// Classify a request into a delegation category based on keyword matching.
fn classify_delegation_category(text: &str) -> String {
let lower = text.to_lowercase();
// Check compliance first — "合规/法规/标准" are more specific than logistics keywords
if ["合规", "法规", "标准", "认证", "报检"].iter().any(|k| lower.contains(k)) {
"compliance".to_string()
} else if ["物流", "发货", "出口", "", "运输", "仓库"].iter().any(|k| lower.contains(k)) {
"logistics".to_string()
} else if ["客户", "投诉", "反馈", "服务", "售后"].iter().any(|k| lower.contains(k)) {
"customer".to_string()
} else if ["报价", "价格", "成本", "利润", "预算"].iter().any(|k| lower.contains(k)) {
"pricing".to_string()
} else if ["系统", "软件", "电脑", "网络", "数据"].iter().any(|k| lower.contains(k)) {
"technology".to_string()
} else {
"general".to_string()
}
}
/// Extract a JSON array from text that may contain surrounding prose.
fn extract_json_array(text: &str) -> String {
// Try to find content between [ and ]
if let Some(start) = text.find('[') {
if let Some(end) = text.rfind(']') {
if end > start {
return text[start..=end].to_string();
}
}
}
// Return original if no array brackets found
text.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_director_creation() {
let director = Director::new(DirectorConfig::default());
let agents = director.get_agents().await;
assert!(agents.is_empty());
}
#[tokio::test]
async fn test_register_agents() {
let director = Director::new(DirectorConfig::default());
director.register_agent(DirectorAgent::new(
AgentId::new(),
"Teacher",
AgentRole::Teacher,
"You are a helpful teacher.",
)).await;
director.register_agent(DirectorAgent::new(
AgentId::new(),
"Student",
AgentRole::Student,
"You are a curious student.",
)).await;
let agents = director.get_agents().await;
assert_eq!(agents.len(), 2);
// Teacher should be first (higher priority)
assert_eq!(agents[0].role, AgentRole::Teacher);
}
#[tokio::test]
async fn test_conversation_state() {
let mut state = ConversationState::new();
assert_eq!(state.turn, 0);
let agent1 = AgentId::new();
let agent2 = AgentId::new();
state.record_turn(agent1, "Hello".to_string());
assert_eq!(state.turn, 1);
assert_eq!(state.consecutive_turns, 1);
state.record_turn(agent1, "World".to_string());
assert_eq!(state.turn, 2);
assert_eq!(state.consecutive_turns, 2);
state.record_turn(agent2, "Goodbye".to_string());
assert_eq!(state.turn, 3);
assert_eq!(state.consecutive_turns, 1);
assert_eq!(state.current_speaker, Some(agent2));
}
#[tokio::test]
async fn test_select_next_speaker_priority() {
let config = DirectorConfig {
strategy: ScheduleStrategy::Priority,
..Default::default()
};
let director = Director::new(config);
let teacher_id = AgentId::new();
let student_id = AgentId::new();
director.register_agent(DirectorAgent::new(
teacher_id,
"Teacher",
AgentRole::Teacher,
"Teaching",
)).await;
director.register_agent(DirectorAgent::new(
student_id,
"Student",
AgentRole::Student,
"Learning",
)).await;
let speaker = director.select_next_speaker().await;
assert!(speaker.is_some());
assert_eq!(speaker.unwrap().role, AgentRole::Teacher);
}
#[tokio::test]
async fn test_director_builder() {
let director = DirectorBuilder::new()
.strategy(ScheduleStrategy::RoundRobin)
.max_turns(10)
.teacher(AgentId::new(), "AI Teacher", "You teach students.")
.student(AgentId::new(), "Curious Student", "You ask questions.")
.build()
.await;
let agents = director.get_agents().await;
assert_eq!(agents.len(), 2);
let state = director.get_state().await;
assert_eq!(state.turn, 0);
}
#[test]
fn test_agent_role_priority() {
assert_eq!(AgentRole::Teacher.default_priority(), 10);
assert_eq!(AgentRole::Assistant.default_priority(), 7);
assert_eq!(AgentRole::Student.default_priority(), 5);
assert_eq!(AgentRole::Observer.default_priority(), 0);
}
#[test]
fn test_agent_role_parse() {
assert_eq!(AgentRole::from_str("teacher"), Some(AgentRole::Teacher));
assert_eq!(AgentRole::from_str("STUDENT"), Some(AgentRole::Student));
assert_eq!(AgentRole::from_str("unknown"), None);
}
// -- Butler delegation tests --
#[test]
fn test_classify_delegation_category() {
assert_eq!(classify_delegation_category("这批物流要发往欧洲"), "logistics");
assert_eq!(classify_delegation_category("出口合规标准变了"), "compliance");
assert_eq!(classify_delegation_category("客户投诉太多了"), "customer");
assert_eq!(classify_delegation_category("报价需要调整"), "pricing");
assert_eq!(classify_delegation_category("系统又崩了"), "technology");
assert_eq!(classify_delegation_category("随便聊聊"), "general");
}
#[test]
fn test_extract_json_array() {
let with_prose = "好的,分析如下:\n[{\"description\":\"分析物流\",\"category\":\"logistics\",\"priority\":8}]\n以上。";
let result = extract_json_array(with_prose);
assert!(result.starts_with('['));
assert!(result.ends_with(']'));
let bare = "[{\"a\":1}]";
assert_eq!(extract_json_array(bare), bare);
let no_array = "just text";
assert_eq!(extract_json_array(no_array), "just text");
}
#[tokio::test]
async fn test_rule_based_decomposition() {
let tasks = Director::decompose_rule_based("出口包装需要整改");
assert_eq!(tasks.len(), 1);
// "包" matches logistics first
assert_eq!(tasks[0].category, "logistics");
assert_eq!(tasks[0].status, ExpertTaskStatus::Pending);
assert!(!tasks[0].id.is_empty());
}
#[tokio::test]
async fn test_butler_delegate_rule_based() {
let director = Director::new(DirectorConfig::default());
// Register an expert
director.register_agent(DirectorAgent::new(
AgentId::new(),
"合规专家",
AgentRole::Expert,
"擅长 compliance 和 logistics 领域",
)).await;
let result = director.butler_delegate("出口包装被退回了,需要整改").await.unwrap();
assert!(result.success);
assert!(result.summary.contains("拆解为"));
assert_eq!(result.tasks.len(), 1);
// Expert should be assigned (matches category)
assert!(result.tasks[0].assigned_expert.is_some());
assert_eq!(result.tasks[0].status, ExpertTaskStatus::Assigned);
}
#[tokio::test]
async fn test_butler_delegate_no_experts() {
let director = Director::new(DirectorConfig::default());
// No agents registered
let result = director.butler_delegate("帮我查一下物流状态").await.unwrap();
assert!(result.success);
assert!(result.tasks[0].assigned_expert.is_none());
assert_eq!(result.tasks[0].status, ExpertTaskStatus::Pending);
}
#[test]
fn test_expert_task_serialization() {
let task = ExpertTask {
id: "test-id".to_string(),
description: "测试任务".to_string(),
assigned_expert: None,
category: "logistics".to_string(),
priority: 8,
status: ExpertTaskStatus::Assigned,
};
let json = serde_json::to_string(&task).unwrap();
let decoded: ExpertTask = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.id, "test-id");
assert_eq!(decoded.category, "logistics");
assert_eq!(decoded.status, ExpertTaskStatus::Assigned);
}
}