feat: sub-agent streaming progress — TaskTool emits real-time status events
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
Some checks failed
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Rust Check (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
- Rust: LoopEvent::SubtaskStatus variant added to loop_runner.rs - Rust: ToolContext.event_sender field for streaming tool progress - Rust: TaskTool emits started/running/completed/failed via event_sender - Rust: StreamChatEvent::SubtaskStatus mapped in Tauri chat command - TS: StreamEventSubtaskStatus type + onSubtaskStatus callback added - TS: kernel-chat.ts handles subtaskStatus event from Tauri - TS: streamStore.ts wires callback, maps backend→frontend status, updates assistant message subtasks array in real-time
This commit is contained in:
@@ -206,6 +206,7 @@ impl AgentLoop {
|
|||||||
session_id: Some(session_id.to_string()),
|
session_id: Some(session_id.to_string()),
|
||||||
skill_executor: self.skill_executor.clone(),
|
skill_executor: self.skill_executor.clone(),
|
||||||
path_validator: Some(path_validator),
|
path_validator: Some(path_validator),
|
||||||
|
event_sender: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -880,6 +881,7 @@ impl AgentLoop {
|
|||||||
session_id: Some(session_id_clone.to_string()),
|
session_id: Some(session_id_clone.to_string()),
|
||||||
skill_executor: skill_executor.clone(),
|
skill_executor: skill_executor.clone(),
|
||||||
path_validator: Some(pv),
|
path_validator: Some(pv),
|
||||||
|
event_sender: Some(tx.clone()),
|
||||||
};
|
};
|
||||||
let (result, is_error) = if let Some(tool) = tools.get(&name) {
|
let (result, is_error) = if let Some(tool) = tools.get(&name) {
|
||||||
match tool.execute(new_input, &tool_context).await {
|
match tool.execute(new_input, &tool_context).await {
|
||||||
@@ -945,6 +947,7 @@ impl AgentLoop {
|
|||||||
session_id: Some(session_id_clone.to_string()),
|
session_id: Some(session_id_clone.to_string()),
|
||||||
skill_executor: skill_executor.clone(),
|
skill_executor: skill_executor.clone(),
|
||||||
path_validator: Some(pv),
|
path_validator: Some(pv),
|
||||||
|
event_sender: Some(tx.clone()),
|
||||||
};
|
};
|
||||||
|
|
||||||
let (result, is_error) = if let Some(tool) = tools.get(&name) {
|
let (result, is_error) = if let Some(tool) = tools.get(&name) {
|
||||||
@@ -1008,6 +1011,12 @@ pub enum LoopEvent {
|
|||||||
ToolStart { name: String, input: serde_json::Value },
|
ToolStart { name: String, input: serde_json::Value },
|
||||||
/// Tool execution completed
|
/// Tool execution completed
|
||||||
ToolEnd { name: String, output: serde_json::Value },
|
ToolEnd { name: String, output: serde_json::Value },
|
||||||
|
/// Sub-agent task status update (started/running/completed/failed)
|
||||||
|
SubtaskStatus {
|
||||||
|
description: String,
|
||||||
|
status: String,
|
||||||
|
detail: Option<String>,
|
||||||
|
},
|
||||||
/// New iteration started (multi-turn tool calling)
|
/// New iteration started (multi-turn tool calling)
|
||||||
IterationStart { iteration: usize, max_iterations: usize },
|
IterationStart { iteration: usize, max_iterations: usize },
|
||||||
/// Loop completed with final result
|
/// Loop completed with final result
|
||||||
|
|||||||
@@ -4,9 +4,11 @@ use std::collections::HashMap;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
use zclaw_types::{AgentId, Result};
|
use zclaw_types::{AgentId, Result};
|
||||||
|
|
||||||
use crate::driver::ToolDefinition;
|
use crate::driver::ToolDefinition;
|
||||||
|
use crate::loop_runner::LoopEvent;
|
||||||
use crate::tool::builtin::PathValidator;
|
use crate::tool::builtin::PathValidator;
|
||||||
|
|
||||||
/// Tool trait for implementing agent tools
|
/// Tool trait for implementing agent tools
|
||||||
@@ -80,6 +82,9 @@ pub struct ToolContext {
|
|||||||
pub skill_executor: Option<Arc<dyn SkillExecutor>>,
|
pub skill_executor: Option<Arc<dyn SkillExecutor>>,
|
||||||
/// Path validator for file system operations
|
/// Path validator for file system operations
|
||||||
pub path_validator: Option<PathValidator>,
|
pub path_validator: Option<PathValidator>,
|
||||||
|
/// Optional event sender for streaming tool progress to the frontend.
|
||||||
|
/// Tools like TaskTool use this to emit sub-agent status events.
|
||||||
|
pub event_sender: Option<mpsc::Sender<LoopEvent>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for ToolContext {
|
impl std::fmt::Debug for ToolContext {
|
||||||
@@ -90,6 +95,7 @@ impl std::fmt::Debug for ToolContext {
|
|||||||
.field("session_id", &self.session_id)
|
.field("session_id", &self.session_id)
|
||||||
.field("skill_executor", &self.skill_executor.as_ref().map(|_| "SkillExecutor"))
|
.field("skill_executor", &self.skill_executor.as_ref().map(|_| "SkillExecutor"))
|
||||||
.field("path_validator", &self.path_validator.as_ref().map(|_| "PathValidator"))
|
.field("path_validator", &self.path_validator.as_ref().map(|_| "PathValidator"))
|
||||||
|
.field("event_sender", &self.event_sender.as_ref().map(|_| "Sender<LoopEvent>"))
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -102,6 +108,7 @@ impl Clone for ToolContext {
|
|||||||
session_id: self.session_id.clone(),
|
session_id: self.session_id.clone(),
|
||||||
skill_executor: self.skill_executor.clone(),
|
skill_executor: self.skill_executor.clone(),
|
||||||
path_validator: self.path_validator.clone(),
|
path_validator: self.path_validator.clone(),
|
||||||
|
event_sender: self.event_sender.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ use zclaw_types::{AgentId, Result, ZclawError};
|
|||||||
use zclaw_memory::MemoryStore;
|
use zclaw_memory::MemoryStore;
|
||||||
|
|
||||||
use crate::driver::LlmDriver;
|
use crate::driver::LlmDriver;
|
||||||
use crate::loop_runner::AgentLoop;
|
use crate::loop_runner::{AgentLoop, LoopEvent};
|
||||||
use crate::tool::{Tool, ToolContext, ToolRegistry};
|
use crate::tool::{Tool, ToolContext, ToolRegistry};
|
||||||
use crate::tool::builtin::register_builtin_tools;
|
use crate::tool::builtin::register_builtin_tools;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
@@ -106,6 +106,15 @@ impl Tool for TaskTool {
|
|||||||
description, max_iterations
|
description, max_iterations
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Emit subtask_started event
|
||||||
|
if let Some(ref tx) = context.event_sender {
|
||||||
|
let _ = tx.send(LoopEvent::SubtaskStatus {
|
||||||
|
description: description.to_string(),
|
||||||
|
status: "started".to_string(),
|
||||||
|
detail: None,
|
||||||
|
}).await;
|
||||||
|
}
|
||||||
|
|
||||||
// Create a sub-agent with its own ID
|
// Create a sub-agent with its own ID
|
||||||
let sub_agent_id = AgentId::new();
|
let sub_agent_id = AgentId::new();
|
||||||
|
|
||||||
@@ -148,6 +157,15 @@ impl Tool for TaskTool {
|
|||||||
sub_loop = sub_loop.with_path_validator(validator.clone());
|
sub_loop = sub_loop.with_path_validator(validator.clone());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Emit subtask_running event
|
||||||
|
if let Some(ref tx) = context.event_sender {
|
||||||
|
let _ = tx.send(LoopEvent::SubtaskStatus {
|
||||||
|
description: description.to_string(),
|
||||||
|
status: "running".to_string(),
|
||||||
|
detail: Some("子Agent正在执行中...".to_string()),
|
||||||
|
}).await;
|
||||||
|
}
|
||||||
|
|
||||||
// Execute the sub-agent loop (non-streaming — collect full result)
|
// Execute the sub-agent loop (non-streaming — collect full result)
|
||||||
let result = match sub_loop.run(session_id.clone(), prompt.to_string()).await {
|
let result = match sub_loop.run(session_id.clone(), prompt.to_string()).await {
|
||||||
Ok(loop_result) => {
|
Ok(loop_result) => {
|
||||||
@@ -155,6 +173,19 @@ impl Tool for TaskTool {
|
|||||||
"[TaskTool] Sub-agent completed: {} iterations, {} input tokens, {} output tokens",
|
"[TaskTool] Sub-agent completed: {} iterations, {} input tokens, {} output tokens",
|
||||||
loop_result.iterations, loop_result.input_tokens, loop_result.output_tokens
|
loop_result.iterations, loop_result.input_tokens, loop_result.output_tokens
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Emit subtask_completed event
|
||||||
|
if let Some(ref tx) = context.event_sender {
|
||||||
|
let _ = tx.send(LoopEvent::SubtaskStatus {
|
||||||
|
description: description.to_string(),
|
||||||
|
status: "completed".to_string(),
|
||||||
|
detail: Some(format!(
|
||||||
|
"完成 ({}次迭代, {}输入token)",
|
||||||
|
loop_result.iterations, loop_result.input_tokens
|
||||||
|
)),
|
||||||
|
}).await;
|
||||||
|
}
|
||||||
|
|
||||||
json!({
|
json!({
|
||||||
"status": "completed",
|
"status": "completed",
|
||||||
"description": description,
|
"description": description,
|
||||||
@@ -166,6 +197,16 @@ impl Tool for TaskTool {
|
|||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::warn!("[TaskTool] Sub-agent failed: {}", e);
|
tracing::warn!("[TaskTool] Sub-agent failed: {}", e);
|
||||||
|
|
||||||
|
// Emit subtask_failed event
|
||||||
|
if let Some(ref tx) = context.event_sender {
|
||||||
|
let _ = tx.send(LoopEvent::SubtaskStatus {
|
||||||
|
description: description.to_string(),
|
||||||
|
status: "failed".to_string(),
|
||||||
|
detail: Some(e.to_string()),
|
||||||
|
}).await;
|
||||||
|
}
|
||||||
|
|
||||||
json!({
|
json!({
|
||||||
"status": "failed",
|
"status": "failed",
|
||||||
"description": description,
|
"description": description,
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ pub enum StreamChatEvent {
|
|||||||
ThinkingDelta { delta: String },
|
ThinkingDelta { delta: String },
|
||||||
ToolStart { name: String, input: serde_json::Value },
|
ToolStart { name: String, input: serde_json::Value },
|
||||||
ToolEnd { name: String, output: serde_json::Value },
|
ToolEnd { name: String, output: serde_json::Value },
|
||||||
|
SubtaskStatus { description: String, status: String, detail: Option<String> },
|
||||||
IterationStart { iteration: usize, max_iterations: usize },
|
IterationStart { iteration: usize, max_iterations: usize },
|
||||||
HandStart { name: String, params: serde_json::Value },
|
HandStart { name: String, params: serde_json::Value },
|
||||||
HandEnd { name: String, result: serde_json::Value },
|
HandEnd { name: String, result: serde_json::Value },
|
||||||
@@ -294,6 +295,14 @@ pub async fn agent_chat_stream(
|
|||||||
StreamChatEvent::ToolEnd { name: name.clone(), output: output.clone() }
|
StreamChatEvent::ToolEnd { name: name.clone(), output: output.clone() }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
LoopEvent::SubtaskStatus { description, status, detail } => {
|
||||||
|
tracing::debug!("[agent_chat_stream] SubtaskStatus: {} - {}", description, status);
|
||||||
|
StreamChatEvent::SubtaskStatus {
|
||||||
|
description: description.clone(),
|
||||||
|
status: status.clone(),
|
||||||
|
detail: detail.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
LoopEvent::IterationStart { iteration, max_iterations } => {
|
LoopEvent::IterationStart { iteration, max_iterations } => {
|
||||||
tracing::debug!("[agent_chat_stream] IterationStart: {}/{}", iteration, max_iterations);
|
tracing::debug!("[agent_chat_stream] IterationStart: {}/{}", iteration, max_iterations);
|
||||||
StreamChatEvent::IterationStart { iteration: *iteration, max_iterations: *max_iterations }
|
StreamChatEvent::IterationStart { iteration: *iteration, max_iterations: *max_iterations }
|
||||||
|
|||||||
@@ -146,6 +146,17 @@ export function installChatMethods(ClientClass: { prototype: KernelClient }): vo
|
|||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
case 'subtaskStatus':
|
||||||
|
log.debug('Subtask status:', streamEvent.description, streamEvent.status, streamEvent.detail);
|
||||||
|
if (callbacks.onSubtaskStatus) {
|
||||||
|
callbacks.onSubtaskStatus(
|
||||||
|
streamEvent.description,
|
||||||
|
streamEvent.status,
|
||||||
|
streamEvent.detail ?? undefined
|
||||||
|
);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
|
||||||
case 'iterationStart':
|
case 'iterationStart':
|
||||||
log.debug('Iteration started:', streamEvent.iteration, '/', streamEvent.maxIterations);
|
log.debug('Iteration started:', streamEvent.iteration, '/', streamEvent.maxIterations);
|
||||||
// Don't need to notify user about iterations
|
// Don't need to notify user about iterations
|
||||||
|
|||||||
@@ -69,6 +69,7 @@ export interface StreamCallbacks {
|
|||||||
onThinkingDelta?: (delta: string) => void;
|
onThinkingDelta?: (delta: string) => void;
|
||||||
onTool?: (tool: string, input: string, output: string) => void;
|
onTool?: (tool: string, input: string, output: string) => void;
|
||||||
onHand?: (name: string, status: string, result?: unknown) => void;
|
onHand?: (name: string, status: string, result?: unknown) => void;
|
||||||
|
onSubtaskStatus?: (description: string, status: string, detail?: string) => void;
|
||||||
onComplete: (inputTokens?: number, outputTokens?: number) => void;
|
onComplete: (inputTokens?: number, outputTokens?: number) => void;
|
||||||
onError: (error: string) => void;
|
onError: (error: string) => void;
|
||||||
}
|
}
|
||||||
@@ -126,6 +127,13 @@ export interface StreamEventHandEnd {
|
|||||||
result: unknown;
|
result: unknown;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface StreamEventSubtaskStatus {
|
||||||
|
type: 'subtaskStatus';
|
||||||
|
description: string;
|
||||||
|
status: string;
|
||||||
|
detail?: string;
|
||||||
|
}
|
||||||
|
|
||||||
export type StreamChatEvent =
|
export type StreamChatEvent =
|
||||||
| StreamEventDelta
|
| StreamEventDelta
|
||||||
| StreamEventThinkingDelta
|
| StreamEventThinkingDelta
|
||||||
@@ -134,6 +142,7 @@ export type StreamChatEvent =
|
|||||||
| StreamEventIterationStart
|
| StreamEventIterationStart
|
||||||
| StreamEventHandStart
|
| StreamEventHandStart
|
||||||
| StreamEventHandEnd
|
| StreamEventHandEnd
|
||||||
|
| StreamEventSubtaskStatus
|
||||||
| StreamEventComplete
|
| StreamEventComplete
|
||||||
| StreamEventError;
|
| StreamEventError;
|
||||||
|
|
||||||
|
|||||||
@@ -382,6 +382,35 @@ export const useStreamStore = create<StreamState>()(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
onSubtaskStatus: (description: string, status: string, detail?: string) => {
|
||||||
|
// Map backend status to frontend Subtask status
|
||||||
|
const statusMap: Record<string, Subtask['status']> = {
|
||||||
|
started: 'pending',
|
||||||
|
running: 'in_progress',
|
||||||
|
completed: 'completed',
|
||||||
|
failed: 'failed',
|
||||||
|
};
|
||||||
|
const mappedStatus = statusMap[status] || 'in_progress';
|
||||||
|
|
||||||
|
_chat?.updateMessages(msgs =>
|
||||||
|
msgs.map(m => {
|
||||||
|
if (m.id !== assistantId) return m;
|
||||||
|
const subtasks = [...(m.subtasks || [])];
|
||||||
|
const existingIdx = subtasks.findIndex(st => st.description === description);
|
||||||
|
if (existingIdx >= 0) {
|
||||||
|
subtasks[existingIdx] = { ...subtasks[existingIdx], status: mappedStatus, result: detail };
|
||||||
|
} else {
|
||||||
|
subtasks.push({
|
||||||
|
id: `subtask_${Date.now()}_${generateRandomString(4)}`,
|
||||||
|
description,
|
||||||
|
status: mappedStatus,
|
||||||
|
result: detail,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return { ...m, subtasks };
|
||||||
|
})
|
||||||
|
);
|
||||||
|
},
|
||||||
onComplete: (inputTokens?: number, outputTokens?: number) => {
|
onComplete: (inputTokens?: number, outputTokens?: number) => {
|
||||||
const currentMsgs = _chat?.getMessages();
|
const currentMsgs = _chat?.getMessages();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user