feat(streaming): add Tauri streaming chat command

Add agent_chat_stream Tauri command that:
- Accepts StreamChatRequest with agent_id, session_id, message
- Gets streaming receiver from kernel.send_message_stream()
- Spawns background task to emit Tauri events ("stream:chunk")
- Emits StreamChatEvent types (Delta, ToolStart, ToolEnd, Complete, Error)
- Includes session_id for frontend routing

Registered in lib.rs invoke_handler.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
iven
2026-03-24 01:50:47 +08:00
parent 6f82723225
commit 936c922081
3 changed files with 175 additions and 2 deletions

View File

@@ -4,9 +4,10 @@
//! eliminating the need for external OpenFang process.
use std::sync::Arc;
use tauri::{AppHandle, Manager, State};
use tauri::{AppHandle, Emitter, Manager, State};
use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use futures::StreamExt;
use zclaw_kernel::Kernel;
use zclaw_types::{AgentConfig, AgentId, AgentInfo, AgentState};
@@ -342,6 +343,102 @@ pub async fn agent_chat(
})
}
/// Streaming chat event for Tauri emission
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum StreamChatEvent {
/// Text delta received
Delta { delta: String },
/// Tool use started
ToolStart { name: String, input: serde_json::Value },
/// Tool use completed
ToolEnd { name: String, output: serde_json::Value },
/// Stream completed
Complete { input_tokens: u32, output_tokens: u32 },
/// Error occurred
Error { message: String },
}
/// Streaming chat request
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StreamChatRequest {
/// Agent ID
pub agent_id: String,
/// Session ID (for event routing)
pub session_id: String,
/// Message content
pub message: String,
}
/// Send a message to an agent with streaming response
///
/// This command initiates a streaming chat session. Events are emitted
/// via Tauri's event system with the name "stream:chunk" and include
/// the session_id for routing.
#[tauri::command]
pub async fn agent_chat_stream(
app: AppHandle,
state: State<'_, KernelState>,
request: StreamChatRequest,
) -> Result<(), String> {
// Parse agent ID first
let id: AgentId = request.agent_id.parse()
.map_err(|_| "Invalid agent ID format".to_string())?;
let session_id = request.session_id.clone();
let message = request.message;
// Get the streaming receiver while holding the lock, then release it
let mut rx = {
let kernel_lock = state.lock().await;
let kernel = kernel_lock.as_ref()
.ok_or_else(|| "Kernel not initialized. Call kernel_init first.".to_string())?;
// Start the stream - this spawns a background task
kernel.send_message_stream(&id, message)
.await
.map_err(|e| format!("Failed to start streaming: {}", e))?
};
// Lock is released here
// Spawn a task to process stream events
tokio::spawn(async move {
use zclaw_runtime::LoopEvent;
while let Some(event) = rx.recv().await {
let stream_event = match event {
LoopEvent::Delta(delta) => {
StreamChatEvent::Delta { delta }
}
LoopEvent::ToolStart { name, input } => {
StreamChatEvent::ToolStart { name, input }
}
LoopEvent::ToolEnd { name, output } => {
StreamChatEvent::ToolEnd { name, output }
}
LoopEvent::Complete(result) => {
StreamChatEvent::Complete {
input_tokens: result.input_tokens,
output_tokens: result.output_tokens,
}
}
LoopEvent::Error(message) => {
StreamChatEvent::Error { message }
}
};
// Emit the event with session_id for routing
let _ = app.emit("stream:chunk", serde_json::json!({
"sessionId": session_id,
"event": stream_event
}));
}
});
Ok(())
}
/// Create the kernel state for Tauri
pub fn create_kernel_state() -> KernelState {
Arc::new(Mutex::new(None))

View File

@@ -1332,6 +1332,7 @@ pub fn run() {
kernel_commands::agent_get,
kernel_commands::agent_delete,
kernel_commands::agent_chat,
kernel_commands::agent_chat_stream,
// OpenFang commands (new naming)
openfang_status,
openfang_start,
@@ -1347,7 +1348,6 @@ pub fn run() {
openfang_process_logs,
openfang_version,
// Health check commands
openfang_health_check,
openfang_ping,
// Backward-compatible aliases (OpenClaw naming)
gateway_status,