From 9871c254be1ec07becf3742227b06315a58d36e6 Mon Sep 17 00:00:00 2001 From: iven Date: Mon, 6 Apr 2026 13:05:37 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20sub-agent=20streaming=20progress=20?= =?UTF-8?q?=E2=80=94=20TaskTool=20emits=20real-time=20status=20events?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rust: LoopEvent::SubtaskStatus variant added to loop_runner.rs - Rust: ToolContext.event_sender field for streaming tool progress - Rust: TaskTool emits started/running/completed/failed via event_sender - Rust: StreamChatEvent::SubtaskStatus mapped in Tauri chat command - TS: StreamEventSubtaskStatus type + onSubtaskStatus callback added - TS: kernel-chat.ts handles subtaskStatus event from Tauri - TS: streamStore.ts wires callback, maps backend→frontend status, updates assistant message subtasks array in real-time --- crates/zclaw-runtime/src/loop_runner.rs | 9 ++++ crates/zclaw-runtime/src/tool.rs | 7 +++ crates/zclaw-runtime/src/tool/builtin/task.rs | 43 ++++++++++++++++++- desktop/src-tauri/src/kernel_commands/chat.rs | 9 ++++ desktop/src/lib/kernel-chat.ts | 11 +++++ desktop/src/lib/kernel-types.ts | 9 ++++ desktop/src/store/chat/streamStore.ts | 29 +++++++++++++ 7 files changed, 116 insertions(+), 1 deletion(-) diff --git a/crates/zclaw-runtime/src/loop_runner.rs b/crates/zclaw-runtime/src/loop_runner.rs index 2c14450..9877774 100644 --- a/crates/zclaw-runtime/src/loop_runner.rs +++ b/crates/zclaw-runtime/src/loop_runner.rs @@ -206,6 +206,7 @@ impl AgentLoop { session_id: Some(session_id.to_string()), skill_executor: self.skill_executor.clone(), path_validator: Some(path_validator), + event_sender: None, } } @@ -880,6 +881,7 @@ impl AgentLoop { session_id: Some(session_id_clone.to_string()), skill_executor: skill_executor.clone(), path_validator: Some(pv), + event_sender: Some(tx.clone()), }; let (result, is_error) = if let Some(tool) = tools.get(&name) { match tool.execute(new_input, &tool_context).await { @@ -945,6 +947,7 @@ impl AgentLoop { session_id: Some(session_id_clone.to_string()), skill_executor: skill_executor.clone(), path_validator: Some(pv), + event_sender: Some(tx.clone()), }; let (result, is_error) = if let Some(tool) = tools.get(&name) { @@ -1008,6 +1011,12 @@ pub enum LoopEvent { ToolStart { name: String, input: serde_json::Value }, /// Tool execution completed ToolEnd { name: String, output: serde_json::Value }, + /// Sub-agent task status update (started/running/completed/failed) + SubtaskStatus { + description: String, + status: String, + detail: Option, + }, /// New iteration started (multi-turn tool calling) IterationStart { iteration: usize, max_iterations: usize }, /// Loop completed with final result diff --git a/crates/zclaw-runtime/src/tool.rs b/crates/zclaw-runtime/src/tool.rs index 0e15a48..fdc820e 100644 --- a/crates/zclaw-runtime/src/tool.rs +++ b/crates/zclaw-runtime/src/tool.rs @@ -4,9 +4,11 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; use serde_json::Value; +use tokio::sync::mpsc; use zclaw_types::{AgentId, Result}; use crate::driver::ToolDefinition; +use crate::loop_runner::LoopEvent; use crate::tool::builtin::PathValidator; /// Tool trait for implementing agent tools @@ -80,6 +82,9 @@ pub struct ToolContext { pub skill_executor: Option>, /// Path validator for file system operations pub path_validator: Option, + /// Optional event sender for streaming tool progress to the frontend. + /// Tools like TaskTool use this to emit sub-agent status events. + pub event_sender: Option>, } impl std::fmt::Debug for ToolContext { @@ -90,6 +95,7 @@ impl std::fmt::Debug for ToolContext { .field("session_id", &self.session_id) .field("skill_executor", &self.skill_executor.as_ref().map(|_| "SkillExecutor")) .field("path_validator", &self.path_validator.as_ref().map(|_| "PathValidator")) + .field("event_sender", &self.event_sender.as_ref().map(|_| "Sender")) .finish() } } @@ -102,6 +108,7 @@ impl Clone for ToolContext { session_id: self.session_id.clone(), skill_executor: self.skill_executor.clone(), path_validator: self.path_validator.clone(), + event_sender: self.event_sender.clone(), } } } diff --git a/crates/zclaw-runtime/src/tool/builtin/task.rs b/crates/zclaw-runtime/src/tool/builtin/task.rs index 24e8936..eec7a30 100644 --- a/crates/zclaw-runtime/src/tool/builtin/task.rs +++ b/crates/zclaw-runtime/src/tool/builtin/task.rs @@ -10,7 +10,7 @@ use zclaw_types::{AgentId, Result, ZclawError}; use zclaw_memory::MemoryStore; use crate::driver::LlmDriver; -use crate::loop_runner::AgentLoop; +use crate::loop_runner::{AgentLoop, LoopEvent}; use crate::tool::{Tool, ToolContext, ToolRegistry}; use crate::tool::builtin::register_builtin_tools; use std::sync::Arc; @@ -106,6 +106,15 @@ impl Tool for TaskTool { description, max_iterations ); + // Emit subtask_started event + if let Some(ref tx) = context.event_sender { + let _ = tx.send(LoopEvent::SubtaskStatus { + description: description.to_string(), + status: "started".to_string(), + detail: None, + }).await; + } + // Create a sub-agent with its own ID let sub_agent_id = AgentId::new(); @@ -148,6 +157,15 @@ impl Tool for TaskTool { sub_loop = sub_loop.with_path_validator(validator.clone()); } + // Emit subtask_running event + if let Some(ref tx) = context.event_sender { + let _ = tx.send(LoopEvent::SubtaskStatus { + description: description.to_string(), + status: "running".to_string(), + detail: Some("子Agent正在执行中...".to_string()), + }).await; + } + // Execute the sub-agent loop (non-streaming — collect full result) let result = match sub_loop.run(session_id.clone(), prompt.to_string()).await { Ok(loop_result) => { @@ -155,6 +173,19 @@ impl Tool for TaskTool { "[TaskTool] Sub-agent completed: {} iterations, {} input tokens, {} output tokens", loop_result.iterations, loop_result.input_tokens, loop_result.output_tokens ); + + // Emit subtask_completed event + if let Some(ref tx) = context.event_sender { + let _ = tx.send(LoopEvent::SubtaskStatus { + description: description.to_string(), + status: "completed".to_string(), + detail: Some(format!( + "完成 ({}次迭代, {}输入token)", + loop_result.iterations, loop_result.input_tokens + )), + }).await; + } + json!({ "status": "completed", "description": description, @@ -166,6 +197,16 @@ impl Tool for TaskTool { } Err(e) => { tracing::warn!("[TaskTool] Sub-agent failed: {}", e); + + // Emit subtask_failed event + if let Some(ref tx) = context.event_sender { + let _ = tx.send(LoopEvent::SubtaskStatus { + description: description.to_string(), + status: "failed".to_string(), + detail: Some(e.to_string()), + }).await; + } + json!({ "status": "failed", "description": description, diff --git a/desktop/src-tauri/src/kernel_commands/chat.rs b/desktop/src-tauri/src/kernel_commands/chat.rs index 94e2cd7..65320e9 100644 --- a/desktop/src-tauri/src/kernel_commands/chat.rs +++ b/desktop/src-tauri/src/kernel_commands/chat.rs @@ -37,6 +37,7 @@ pub enum StreamChatEvent { ThinkingDelta { delta: String }, ToolStart { name: String, input: serde_json::Value }, ToolEnd { name: String, output: serde_json::Value }, + SubtaskStatus { description: String, status: String, detail: Option }, IterationStart { iteration: usize, max_iterations: usize }, HandStart { name: String, params: serde_json::Value }, HandEnd { name: String, result: serde_json::Value }, @@ -294,6 +295,14 @@ pub async fn agent_chat_stream( StreamChatEvent::ToolEnd { name: name.clone(), output: output.clone() } } } + LoopEvent::SubtaskStatus { description, status, detail } => { + tracing::debug!("[agent_chat_stream] SubtaskStatus: {} - {}", description, status); + StreamChatEvent::SubtaskStatus { + description: description.clone(), + status: status.clone(), + detail: detail.clone(), + } + } LoopEvent::IterationStart { iteration, max_iterations } => { tracing::debug!("[agent_chat_stream] IterationStart: {}/{}", iteration, max_iterations); StreamChatEvent::IterationStart { iteration: *iteration, max_iterations: *max_iterations } diff --git a/desktop/src/lib/kernel-chat.ts b/desktop/src/lib/kernel-chat.ts index abc99fc..7a855f9 100644 --- a/desktop/src/lib/kernel-chat.ts +++ b/desktop/src/lib/kernel-chat.ts @@ -146,6 +146,17 @@ export function installChatMethods(ClientClass: { prototype: KernelClient }): vo } break; + case 'subtaskStatus': + log.debug('Subtask status:', streamEvent.description, streamEvent.status, streamEvent.detail); + if (callbacks.onSubtaskStatus) { + callbacks.onSubtaskStatus( + streamEvent.description, + streamEvent.status, + streamEvent.detail ?? undefined + ); + } + break; + case 'iterationStart': log.debug('Iteration started:', streamEvent.iteration, '/', streamEvent.maxIterations); // Don't need to notify user about iterations diff --git a/desktop/src/lib/kernel-types.ts b/desktop/src/lib/kernel-types.ts index bccd873..cf991c9 100644 --- a/desktop/src/lib/kernel-types.ts +++ b/desktop/src/lib/kernel-types.ts @@ -69,6 +69,7 @@ export interface StreamCallbacks { onThinkingDelta?: (delta: string) => void; onTool?: (tool: string, input: string, output: string) => void; onHand?: (name: string, status: string, result?: unknown) => void; + onSubtaskStatus?: (description: string, status: string, detail?: string) => void; onComplete: (inputTokens?: number, outputTokens?: number) => void; onError: (error: string) => void; } @@ -126,6 +127,13 @@ export interface StreamEventHandEnd { result: unknown; } +export interface StreamEventSubtaskStatus { + type: 'subtaskStatus'; + description: string; + status: string; + detail?: string; +} + export type StreamChatEvent = | StreamEventDelta | StreamEventThinkingDelta @@ -134,6 +142,7 @@ export type StreamChatEvent = | StreamEventIterationStart | StreamEventHandStart | StreamEventHandEnd + | StreamEventSubtaskStatus | StreamEventComplete | StreamEventError; diff --git a/desktop/src/store/chat/streamStore.ts b/desktop/src/store/chat/streamStore.ts index 0cad221..8f297fd 100644 --- a/desktop/src/store/chat/streamStore.ts +++ b/desktop/src/store/chat/streamStore.ts @@ -382,6 +382,35 @@ export const useStreamStore = create()( } } }, + onSubtaskStatus: (description: string, status: string, detail?: string) => { + // Map backend status to frontend Subtask status + const statusMap: Record = { + started: 'pending', + running: 'in_progress', + completed: 'completed', + failed: 'failed', + }; + const mappedStatus = statusMap[status] || 'in_progress'; + + _chat?.updateMessages(msgs => + msgs.map(m => { + if (m.id !== assistantId) return m; + const subtasks = [...(m.subtasks || [])]; + const existingIdx = subtasks.findIndex(st => st.description === description); + if (existingIdx >= 0) { + subtasks[existingIdx] = { ...subtasks[existingIdx], status: mappedStatus, result: detail }; + } else { + subtasks.push({ + id: `subtask_${Date.now()}_${generateRandomString(4)}`, + description, + status: mappedStatus, + result: detail, + }); + } + return { ...m, subtasks }; + }) + ); + }, onComplete: (inputTokens?: number, outputTokens?: number) => { const currentMsgs = _chat?.getMessages();