Files
zclaw_openfang/docs/superpowers/plans/2026-03-24-v0.2.0-release-implementation.md
2026-03-24 01:34:17 +08:00

78 KiB
Raw Blame History

ZCLAW v0.2.0 发布实施计划

For agentic workers: REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (- [ ]) syntax for tracking.

Goal: 完成 ZCLAW v0.2.0 正式版发布包含流式响应、MCP 协议、Browser Hand 和工具安全功能。

Architecture: 基于 Tauri 的桌面应用Rust 后端 + React 前端。流式响应通过 Tauri 事件系统传递MCP 使用 JSON-RPC 2.0 协议Browser Hand 基于 playwright-rust。

Tech Stack: Rust, Tauri 2.x, React 19, TypeScript, playwright-rust, reqwest, tokio

Spec Document: docs/superpowers/specs/2026-03-24-v0.2.0-release-plan-design.md


File Structure

新增文件

crates/zclaw-runtime/
├── src/stream.rs                    # 流式响应类型定义 (修改)
├── src/driver/streaming.rs          # 新增 - 流式驱动实现

crates/zclaw-protocols/
├── src/mcp_client.rs                # 新增 - MCP 客户端实现
├── src/mcp_types.rs                 # 新增 - MCP JSON-RPC 类型
├── src/mcp_transport.rs             # 新增 - MCP 传输层 (stdio/HTTP/WS)

crates/zclaw-hands/
├── src/hands/browser.rs             # 新增 - Browser Hand 实现

config/
├── security.toml                    # 新增 - 安全配置

desktop/src/
├── lib/streaming-client.ts          # 新增 - 流式客户端

修改文件

crates/zclaw-runtime/src/driver/mod.rs       # LlmDriver trait 添加 stream()
crates/zclaw-runtime/src/driver/anthropic.rs # Anthropic 流式实现
crates/zclaw-runtime/src/driver/openai.rs    # OpenAI 流式实现
crates/zclaw-runtime/src/loop_runner.rs      # 流式循环实现
crates/zclaw-protocols/src/mcp.rs            # 完整 MCP 实现
crates/zclaw-protocols/src/lib.rs            # 导出新模块
crates/zclaw-hands/src/hands/mod.rs          # 导出 Browser Hand
crates/zclaw-hands/Cargo.toml                # 添加 playwright 依赖
desktop/src/store/chatStore.ts               # 流式事件处理
desktop/src-tauri/src/lib.rs                 # Tauri 流式命令

Chunk 1: 流式响应 - 后端 Trait 和类型

Task 1.1: 定义流式响应类型

Files:

  • Modify: crates/zclaw-runtime/src/stream.rs

  • Step 1: 添加流式响应类型定义

stream.rs 中添加:

//! Streaming response types

use serde::{Deserialize, Serialize};

/// Stream chunk emitted during streaming
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StreamChunk {
    /// Text delta
    TextDelta { delta: String },
    /// Thinking delta (for extended thinking models)
    ThinkingDelta { delta: String },
    /// Tool use started
    ToolUseStart { id: String, name: String },
    /// Tool use input delta
    ToolUseDelta { id: String, delta: String },
    /// Tool use completed
    ToolUseEnd { id: String, input: serde_json::Value },
    /// Stream completed
    Complete {
        input_tokens: u32,
        output_tokens: u32,
        stop_reason: String,
    },
    /// Error occurred
    Error { message: String },
}

/// Streaming event for Tauri emission
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamEvent {
    /// Session ID for routing
    pub session_id: String,
    /// Agent ID for routing
    pub agent_id: String,
    /// The chunk content
    pub chunk: StreamChunk,
}

impl StreamEvent {
    pub fn new(session_id: impl Into<String>, agent_id: impl Into<String>, chunk: StreamChunk) -> Self {
        Self {
            session_id: session_id.into(),
            agent_id: agent_id.into(),
            chunk,
        }
    }
}
  • Step 2: 验证编译通过

Run: cd crates/zclaw-runtime && cargo check Expected: 编译成功,无错误

  • Step 3: 提交类型定义
git add crates/zclaw-runtime/src/stream.rs
git commit -m "feat(runtime): add streaming response types

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"

Task 1.2: 扩展 LlmDriver Trait

Files:

  • Modify: crates/zclaw-runtime/src/driver/mod.rs

  • Step 1: 添加 stream 方法到 trait

LlmDriver trait 中添加方法:

use futures::Stream;
use std::pin::Pin;

use crate::stream::StreamChunk;

/// LLM Driver trait - unified interface for all providers
#[async_trait]
pub trait LlmDriver: Send + Sync {
    /// Get the provider name
    fn provider(&self) -> &str;

    /// Send a completion request
    async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse>;

    /// Send a streaming completion request
    /// Returns a stream of chunks
    fn stream(
        &self,
        request: CompletionRequest,
    ) -> Pin<Box<dyn Stream<Item = Result<StreamChunk>> + Send + '_>>;

    /// Check if the driver is properly configured
    fn is_configured(&self) -> bool;
}
  • Step 2: 在 Cargo.toml 添加 futures 依赖

检查 crates/zclaw-runtime/Cargo.toml 是否有 futures 依赖,如果没有则添加:

futures = "0.3"
  • Step 3: 验证编译 (预期失败)

Run: cd crates/zclaw-runtime && cargo check 2>&1 Expected: 编译失败,因为各驱动未实现 stream() 方法

  • Step 4: 提交 trait 扩展
git add crates/zclaw-runtime/src/driver/mod.rs crates/zclaw-runtime/Cargo.toml
git commit -m "feat(runtime): add stream() method to LlmDriver trait

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"

Chunk 2: 流式响应 - Anthropic 驱动实现

Task 2.1: 实现 Anthropic 流式 API

Files:

  • Modify: crates/zclaw-runtime/src/driver/anthropic.rs

  • Step 1: 添加流式请求类型

anthropic.rs 中添加 SSE 解析类型:

use futures::{Stream, StreamExt};
use crate::stream::StreamChunk;
use std::pin::Pin;

/// SSE event from Anthropic API
#[derive(Debug, Deserialize)]
struct AnthropicStreamEvent {
    #[serde(rename = "type")]
    event_type: String,
    #[serde(default)]
    index: Option<u32>,
    #[serde(default)]
    delta: Option<AnthropicDelta>,
    #[serde(default)]
    content_block: Option<AnthropicContentBlock>,
    #[serde(default)]
    message: Option<AnthropicStreamMessage>,
}

#[derive(Debug, Deserialize)]
struct AnthropicDelta {
    #[serde(default)]
    text: Option<String>,
    #[serde(default)]
    thinking: Option<String>,
    #[serde(default)]
    partial_json: Option<String>,
}

#[derive(Debug, Deserialize)]
struct AnthropicStreamMessage {
    #[serde(default)]
    stop_reason: Option<String>,
    #[serde(default)]
    usage: Option<AnthropicUsage>,
}
  • Step 2: 实现 stream 方法
fn stream(
    &self,
    request: CompletionRequest,
) -> Pin<Box<dyn Stream<Item = Result<StreamChunk>> + Send + '_>> {
    Box::pin(async_stream::stream! {
        let mut stream_request = self.build_api_request(&request);
        stream_request.stream = true;

        let response = match self.client
            .post(format!("{}/v1/messages", self.base_url))
            .header("x-api-key", self.api_key.expose_secret())
            .header("anthropic-version", "2023-06-01")
            .header("content-type", "application/json")
            .json(&stream_request)
            .send()
            .await
        {
            Ok(r) => r,
            Err(e) => {
                yield Err(ZclawError::LlmError(format!("HTTP request failed: {}", e)));
                return;
            }
        };

        if !response.status().is_success() {
            let status = response.status();
            let body = response.text().await.unwrap_or_default();
            yield Err(ZclawError::LlmError(format!("API error {}: {}", status, body)));
            return;
        }

        let mut byte_stream = response.bytes_stream();
        let mut current_tool_id: Option<String> = None;
        let mut current_tool_name: Option<String> = None;
        let mut tool_input_buffer = String::new();

        while let Some(chunk_result) = byte_stream.next().await {
            let chunk = match chunk_result {
                Ok(c) => c,
                Err(e) => {
                    yield Err(ZclawError::LlmError(format!("Stream error: {}", e)));
                    continue;
                }
            };

            let text = String::from_utf8_lossy(&chunk);
            for line in text.lines() {
                if let Some(data) = line.strip_prefix("data: ") {
                    if data == "[DONE]" {
                        continue;
                    }

                    match serde_json::from_str::<AnthropicStreamEvent>(data) {
                        Ok(event) => {
                            match event.event_type.as_str() {
                                "content_block_delta" => {
                                    if let Some(delta) = event.delta {
                                        if let Some(text) = delta.text {
                                            yield Ok(StreamChunk::TextDelta { delta: text });
                                        }
                                        if let Some(thinking) = delta.thinking {
                                            yield Ok(StreamChunk::ThinkingDelta { delta: thinking });
                                        }
                                        if let Some(json) = delta.partial_json {
                                            tool_input_buffer.push_str(&json);
                                        }
                                    }
                                }
                                "content_block_start" => {
                                    if let Some(block) = event.content_block {
                                        match block.block_type.as_str() {
                                            "tool_use" => {
                                                current_tool_id = block.id.clone();
                                                current_tool_name = Some(block.name.clone().unwrap_or_default());
                                                tool_input_buffer.clear();
                                                yield Ok(StreamChunk::ToolUseStart {
                                                    id: block.id.unwrap_or_default(),
                                                    name: block.name.unwrap_or_default(),
                                                });
                                            }
                                            _ => {}
                                        }
                                    }
                                }
                                "content_block_stop" => {
                                    if current_tool_id.is_some() {
                                        let input: serde_json::Value = serde_json::from_str(&tool_input_buffer)
                                            .unwrap_or(serde_json::Value::Object(Default::default()));
                                        yield Ok(StreamChunk::ToolUseEnd {
                                            id: current_tool_id.take().unwrap_or_default(),
                                            input,
                                        });
                                        tool_input_buffer.clear();
                                    }
                                }
                                "message_delta" => {
                                    if let Some(msg) = event.message {
                                        if msg.stop_reason.is_some() {
                                            yield Ok(StreamChunk::Complete {
                                                input_tokens: msg.usage.as_ref().map(|u| u.input_tokens).unwrap_or(0),
                                                output_tokens: msg.usage.as_ref().map(|u| u.output_tokens).unwrap_or(0),
                                                stop_reason: msg.stop_reason.unwrap_or_else(|| "end_turn".to_string()),
                                            });
                                        }
                                    }
                                }
                                "error" => {
                                    yield Ok(StreamChunk::Error {
                                        message: "Stream error".to_string(),
                                    });
                                }
                                _ => {}
                            }
                        }
                        Err(e) => {
                            // Log but don't yield error for parse failures
                            tracing::warn!("Failed to parse SSE event: {} - {}", e, data);
                        }
                    }
                }
            }
        }
    })
}
  • Step 3: 添加 async-stream 依赖

crates/zclaw-runtime/Cargo.toml 添加:

async-stream = "0.3"
tracing = "0.1"
  • Step 4: 验证编译通过

Run: cd crates/zclaw-runtime && cargo check Expected: 编译成功

  • Step 5: 提交 Anthropic 流式实现
git add crates/zclaw-runtime/src/driver/anthropic.rs crates/zclaw-runtime/Cargo.toml
git commit -m "feat(runtime): implement streaming for Anthropic driver

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"

Task 2.2: 实现 OpenAI 流式 API

Files:

  • Modify: crates/zclaw-runtime/src/driver/openai.rs

  • Step 1: 读取现有 OpenAI 驱动代码

Run: cat crates/zclaw-runtime/src/driver/openai.rs | head -100 Expected: 查看现有实现结构

  • Step 2: 添加 OpenAI 流式实现

openai.rs 中添加类似的 stream 方法,适配 OpenAI 的 SSE 格式:

fn stream(
    &self,
    request: CompletionRequest,
) -> Pin<Box<dyn Stream<Item = Result<StreamChunk>> + Send + '_>> {
    Box::pin(async_stream::stream! {
        let stream_request = self.build_api_request(&request);

        let response = match self.client
            .post(format!("{}/v1/chat/completions", self.base_url))
            .header("Authorization", format!("Bearer {}", self.api_key.expose_secret()))
            .header("Content-Type", "application/json")
            .json(&stream_request)
            .send()
            .await
        {
            Ok(r) => r,
            Err(e) => {
                yield Err(ZclawError::LlmError(format!("HTTP request failed: {}", e)));
                return;
            }
        };

        let mut byte_stream = response.bytes_stream();

        while let Some(chunk_result) = byte_stream.next().await {
            let chunk = match chunk_result {
                Ok(c) => c,
                Err(e) => {
                    yield Err(ZclawError::LlmError(format!("Stream error: {}", e)));
                    continue;
                }
            };

            let text = String::from_utf8_lossy(&chunk);
            for line in text.lines() {
                if let Some(data) = line.strip_prefix("data: ") {
                    if data == "[DONE]" {
                        yield Ok(StreamChunk::Complete {
                            input_tokens: 0,
                            output_tokens: 0,
                            stop_reason: "end_turn".to_string(),
                        });
                        continue;
                    }

                    match serde_json::from_str::<OpenAiStreamResponse>(data) {
                        Ok(resp) => {
                            if let Some(choice) = resp.choices.first() {
                                if let Some(delta) = &choice.delta {
                                    if let Some(content) = &delta.content {
                                        yield Ok(StreamChunk::TextDelta { delta: content.clone() });
                                    }
                                }
                                if let Some(tool_calls) = &choice.delta.tool_calls {
                                    for tc in tool_calls {
                                        if let Some(function) = &tc.function {
                                            yield Ok(StreamChunk::ToolUseDelta {
                                                id: tc.id.clone().unwrap_or_default(),
                                                delta: function.arguments.clone().unwrap_or_default(),
                                            });
                                        }
                                    }
                                }
                            }
                        }
                        Err(e) => {
                            tracing::warn!("Failed to parse OpenAI SSE: {}", e);
                        }
                    }
                }
            }
        }
    })
}
  • Step 3: 添加 OpenAI 流式类型
#[derive(Debug, Deserialize)]
struct OpenAiStreamResponse {
    choices: Vec<OpenAiStreamChoice>,
}

#[derive(Debug, Deserialize)]
struct OpenAiStreamChoice {
    delta: OpenAiDelta,
    finish_reason: Option<String>,
}

#[derive(Debug, Deserialize)]
struct OpenAiDelta {
    content: Option<String>,
    tool_calls: Option<Vec<OpenAiToolCallDelta>>,
}

#[derive(Debug, Deserialize)]
struct OpenAiToolCallDelta {
    id: Option<String>,
    function: Option<OpenAiFunctionDelta>,
}

#[derive(Debug, Deserialize)]
struct OpenAiFunctionDelta {
    arguments: Option<String>,
}
  • Step 4: 验证编译通过

Run: cd crates/zclaw-runtime && cargo check Expected: 编译成功

  • Step 5: 提交 OpenAI 流式实现
git add crates/zclaw-runtime/src/driver/openai.rs
git commit -m "feat(runtime): implement streaming for OpenAI driver

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"

Chunk 3: 流式响应 - LoopRunner 和 Tauri 集成

Task 3.1: 实现 LoopRunner 流式循环

Files:

  • Modify: crates/zclaw-runtime/src/loop_runner.rs

  • Step 1: 修改 run_streaming 方法

替换现有的 TODO 占位符:

use crate::stream::{StreamChunk, StreamEvent};
use tauri::{AppHandle, Emitter};

/// Run the agent loop with streaming
pub async fn run_streaming(
    &self,
    session_id: SessionId,
    input: String,
    app_handle: Option<AppHandle>,
) -> Result<mpsc::Receiver<LoopEvent>> {
    let (tx, rx) = mpsc::channel(100);

    // Add user message to session
    let user_message = Message::user(input);
    self.memory.append_message(&session_id, &user_message).await?;

    // Get all messages for context
    let messages = self.memory.get_messages(&session_id).await?;

    // Build completion request
    let request = CompletionRequest {
        model: self.model.clone(),
        system: self.system_prompt.clone(),
        messages,
        tools: self.tools.definitions(),
        max_tokens: Some(self.max_tokens),
        temperature: Some(self.temperature),
        stop: Vec::new(),
        stream: true,
    };

    // Clone necessary data for the async task
    let session_id_str = session_id.to_string();
    let agent_id_str = self.agent_id.to_string();
    let memory = self.memory.clone();
    let driver = self.driver.clone();

    tokio::spawn(async move {
        let mut full_response = String::new();
        let mut input_tokens = 0u32;
        let mut output_tokens = 0u32;

        let mut stream = driver.stream(request);

        use futures::StreamExt;

        while let Some(chunk_result) = stream.next().await {
            match chunk_result {
                Ok(chunk) => {
                    // Emit to Tauri if available
                    if let Some(ref handle) = app_handle {
                        let event = StreamEvent::new(&session_id_str, &agent_id_str, chunk.clone());
                        let _ = handle.emit("stream:chunk", event);
                    }

                    // Track response and tokens
                    match &chunk {
                        StreamChunk::TextDelta { delta } => {
                            full_response.push_str(delta);
                            let _ = tx.send(LoopEvent::Delta(delta.clone())).await;
                        }
                        StreamChunk::ThinkingDelta { delta } => {
                            let _ = tx.send(LoopEvent::Delta(format!("[思考] {}", delta))).await;
                        }
                        StreamChunk::ToolUseStart { name, .. } => {
                            let _ = tx.send(LoopEvent::ToolStart {
                                name: name.clone(),
                                input: serde_json::Value::Null,
                            }).await;
                        }
                        StreamChunk::ToolUseEnd { input, .. } => {
                            let _ = tx.send(LoopEvent::ToolEnd {
                                name: String::new(),
                                output: input.clone(),
                            }).await;
                        }
                        StreamChunk::Complete { input_tokens: it, output_tokens: ot, .. } => {
                            input_tokens = *it;
                            output_tokens = *ot;
                        }
                        StreamChunk::Error { message } => {
                            let _ = tx.send(LoopEvent::Error(message.clone())).await;
                        }
                        _ => {}
                    }
                }
                Err(e) => {
                    let _ = tx.send(LoopEvent::Error(e.to_string())).await;
                }
            }
        }

        // Save assistant message to memory
        let assistant_message = Message::assistant(full_response.clone());
        let _ = memory.append_message(&session_id, &assistant_message).await;

        // Send completion event
        let _ = tx.send(LoopEvent::Complete(AgentLoopResult {
            response: full_response,
            input_tokens,
            output_tokens,
            iterations: 1,
        })).await;
    });

    Ok(rx)
}
  • Step 2: 添加 tauri 依赖 (可选)

如果需要在 runtime crate 中使用 tauriCargo.toml 添加:

[dependencies]
tauri = { version = "2", optional = true }

[features]
default = []
tauri = ["dep:tauri"]
  • Step 3: 验证编译通过

Run: cd crates/zclaw-runtime && cargo check Expected: 编译成功

  • Step 4: 提交 LoopRunner 流式实现
git add crates/zclaw-runtime/src/loop_runner.rs crates/zclaw-runtime/Cargo.toml
git commit -m "feat(runtime): implement streaming in AgentLoop

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"

Task 3.2: 添加 Tauri 流式命令

Files:

  • Modify: desktop/src-tauri/src/lib.rs

  • Step 1: 添加流式聊天命令

在 Tauri commands 中添加:

use zclaw_runtime::{AgentLoop, LoopEvent};
use tauri::{AppHandle, Emitter};

#[tauri::command]
async fn chat_stream(
    agent_id: String,
    session_id: String,
    message: String,
    app: AppHandle,
    state: State<'_, KernelState>,
) -> Result<(), String> {
    let agent_loop = state.kernel
        .get_agent_loop(&agent_id)
        .await
        .map_err(|e| e.to_string())?;

    let session: zclaw_types::SessionId = session_id.parse()
        .map_err(|_| "Invalid session ID")?;

    let mut rx = agent_loop
        .run_streaming(session, message, Some(app))
        .await
        .map_err(|e| e.to_string())?;

    // The streaming is handled via Tauri events in run_streaming
    // This command just initiates the stream

    Ok(())
}
  • Step 2: 注册命令

lib.rsinvoke_handler 中注册:

.invoke_handler(tauri::generate_handler![
    // ... existing commands
    chat_stream,
])
  • Step 3: 验证编译通过

Run: cd desktop && pnpm tauri build --debug 2>&1 | head -50 Expected: 编译开始,检查错误

  • Step 4: 提交 Tauri 流式命令
git add desktop/src-tauri/src/lib.rs
git commit -m "feat(tauri): add chat_stream command for streaming

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"

Chunk 4: 流式响应 - 前端集成

Task 4.1: 创建流式客户端

Files:

  • Create: desktop/src/lib/streaming-client.ts

  • Step 1: 创建流式客户端类

// desktop/src/lib/streaming-client.ts
import { listen, UnlistenFn } from '@tauri-apps/api/event';

export interface StreamChunk {
  type: 'text_delta' | 'thinking_delta' | 'tool_use_start' | 'tool_use_end' | 'complete' | 'error';
  delta?: string;
  id?: string;
  name?: string;
  input?: unknown;
  input_tokens?: number;
  output_tokens?: number;
  stop_reason?: string;
  message?: string;
}

export interface StreamEvent {
  session_id: string;
  agent_id: string;
  chunk: StreamChunk;
}

export type StreamHandler = (chunk: StreamChunk) => void;

export class StreamingClient {
  private unlisten: UnlistenFn | null = null;
  private currentSessionId: string | null = null;
  private handlers: Map<string, StreamHandler[]> = new Map();

  async connect(): Promise<void> {
    this.unlisten = await listen<StreamEvent>('stream:chunk', (event) => {
      const { session_id, chunk } = event.payload;

      if (this.currentSessionId && session_id === this.currentSessionId) {
        const handlers = this.handlers.get(session_id) || [];
        handlers.forEach(handler => handler(chunk));
      }
    });
  }

  async disconnect(): Promise<void> {
    if (this.unlisten) {
      this.unlisten();
      this.unlisten = null;
    }
    this.handlers.clear();
  }

  subscribe(sessionId: string, handler: StreamHandler): () => void {
    this.currentSessionId = sessionId;

    if (!this.handlers.has(sessionId)) {
      this.handlers.set(sessionId, []);
    }
    this.handlers.get(sessionId)!.push(handler);

    // Return unsubscribe function
    return () => {
      const handlers = this.handlers.get(sessionId);
      if (handlers) {
        const index = handlers.indexOf(handler);
        if (index > -1) {
          handlers.splice(index, 1);
        }
      }
    };
  }

  clearSession(sessionId: string): void {
    this.handlers.delete(sessionId);
    if (this.currentSessionId === sessionId) {
      this.currentSessionId = null;
    }
  }
}

// Singleton instance
let streamingClient: StreamingClient | null = null;

export async function getStreamingClient(): Promise<StreamingClient> {
  if (!streamingClient) {
    streamingClient = new StreamingClient();
    await streamingClient.connect();
  }
  return streamingClient;
}
  • Step 2: 验证 TypeScript 编译

Run: cd desktop && pnpm tsc --noEmit Expected: 无类型错误

  • Step 3: 提交流式客户端
git add desktop/src/lib/streaming-client.ts
git commit -m "feat(frontend): add streaming client for Tauri events

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"

Task 4.2: 集成到 ChatStore

Files:

  • Modify: desktop/src/store/chatStore.ts

  • Step 1: 添加流式状态和处理

import { getStreamingClient, StreamChunk } from '../lib/streaming-client';
import { invoke } from '@tauri-apps/api/core';

interface ChatState {
  // ... existing state
  streamingMessage: string;
  isStreaming: boolean;
  streamError: string | null;
}

// In store actions:
async sendStreamingMessage(content: string) {
  const sessionId = this.currentSessionId;
  const agentId = this.currentAgentId;

  if (!sessionId || !agentId) return;

  // Add user message immediately
  this.addMessage({
    role: 'user',
    content,
  });

  // Prepare for streaming
  this.streamingMessage = '';
  this.isStreaming = true;
  this.streamError = null;

  // Create placeholder assistant message
  const assistantMessageId = this.addMessage({
    role: 'assistant',
    content: '',
  });

  try {
    const client = await getStreamingClient();

    const unsubscribe = client.subscribe(sessionId, (chunk: StreamChunk) => {
      switch (chunk.type) {
        case 'text_delta':
          this.streamingMessage += chunk.delta || '';
          this.updateMessage(assistantMessageId, {
            content: this.streamingMessage,
          });
          break;

        case 'thinking_delta':
          // Handle thinking if needed
          break;

        case 'tool_use_start':
          this.addMessage({
            role: 'tool_use',
            content: '',
            toolName: chunk.name,
            toolId: chunk.id,
          });
          break;

        case 'complete':
          this.isStreaming = false;
          unsubscribe();
          break;

        case 'error':
          this.streamError = chunk.message || 'Unknown error';
          this.isStreaming = false;
          unsubscribe();
          break;
      }
    });

    // Invoke the streaming command
    await invoke('chat_stream', {
      agentId,
      sessionId,
      message: content,
    });

  } catch (error) {
    this.streamError = String(error);
    this.isStreaming = false;
  }
}
  • Step 2: 验证 TypeScript 编译

Run: cd desktop && pnpm tsc --noEmit Expected: 无类型错误

  • Step 3: 提交 ChatStore 流式集成
git add desktop/src/store/chatStore.ts
git commit -m "feat(frontend): integrate streaming into ChatStore

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"

Task 4.3: 更新聊天 UI 组件

Files:

  • Modify: desktop/src/components/ChatPanel.tsx (或对应组件)

  • Step 1: 添加流式发送逻辑

在发送消息的处理中添加流式判断:

const handleSend = async (content: string) => {
  if (useStreaming) {
    await chatStore.sendStreamingMessage(content);
  } else {
    await chatStore.sendMessage(content);
  }
};
  • Step 2: 添加流式指示器
{chatStore.isStreaming && (
  <div className="flex items-center gap-2 text-sm text-gray-500">
    <span className="animate-pulse"></span>
    正在生成...
  </div>
)}
  • Step 3: 验证编译通过

Run: cd desktop && pnpm tsc --noEmit Expected: 无类型错误

  • Step 4: 提交 UI 更新
git add desktop/src/components/ChatPanel.tsx
git commit -m "feat(frontend): add streaming UI indicators

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"

里程碑 M1: 流式响应完成

  • 验证流式响应功能

Run: pnpm desktop 启动应用 Expected:

  1. 发送消息后能看到逐字显示
  2. 流式过程中有指示器
  3. 完成后状态正确
  • 提交里程碑
git tag v0.2.0-m1-streaming
git push origin v0.2.0-m1-streaming

Chunk 5: MCP 协议 - 传输层和类型定义

Task 5.1: 创建 MCP JSON-RPC 类型

Files:

  • Create: crates/zclaw-protocols/src/mcp_types.rs

  • Step 1: 创建 MCP JSON-RPC 类型文件

//! MCP JSON-RPC 2.0 types

use serde::{Deserialize, Serialize};
use std::collections::HashMap;

/// JSON-RPC 2.0 Request
#[derive(Debug, Clone, Serialize)]
pub struct JsonRpcRequest {
    pub jsonrpc: &'static str,
    pub id: u64,
    pub method: String,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub params: Option<serde_json::Value>,
}

impl JsonRpcRequest {
    pub fn new(id: u64, method: impl Into<String>) -> Self {
        Self {
            jsonrpc: "2.0",
            id,
            method: method.into(),
            params: None,
        }
    }

    pub fn with_params(mut self, params: serde_json::Value) -> Self {
        self.params = Some(params);
        self
    }
}

/// JSON-RPC 2.0 Response
#[derive(Debug, Clone, Deserialize)]
pub struct JsonRpcResponse {
    pub jsonrpc: String,
    pub id: u64,
    #[serde(default)]
    pub result: Option<serde_json::Value>,
    #[serde(default)]
    pub error: Option<JsonRpcError>,
}

/// JSON-RPC Error
#[derive(Debug, Clone, Deserialize)]
pub struct JsonRpcError {
    pub code: i32,
    pub message: String,
    #[serde(default)]
    pub data: Option<serde_json::Value>,
}

/// MCP Initialize Request
#[derive(Debug, Clone, Serialize)]
pub struct InitializeRequest {
    pub protocol_version: String,
    pub capabilities: ClientCapabilities,
    pub client_info: Implementation,
}

impl Default for InitializeRequest {
    fn default() -> Self {
        Self {
            protocol_version: "2024-11-05".to_string(),
            capabilities: ClientCapabilities::default(),
            client_info: Implementation {
                name: "zclaw".to_string(),
                version: env!("CARGO_PKG_VERSION").to_string(),
            },
        }
    }
}

/// Client capabilities
#[derive(Debug, Clone, Serialize, Default)]
pub struct ClientCapabilities {
    #[serde(skip_serializing_if = "Option::is_none")]
    pub roots: Option<RootsCapability>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub sampling: Option<SamplingCapability>,
}

#[derive(Debug, Clone, Serialize)]
pub struct RootsCapability {
    #[serde(default)]
    pub list_changed: bool,
}

#[derive(Debug, Clone, Serialize, Default)]
pub struct SamplingCapability {}

/// Server capabilities (from initialize response)
#[derive(Debug, Clone, Deserialize, Default)]
pub struct ServerCapabilities {
    #[serde(default)]
    pub tools: Option<ToolsCapability>,
    #[serde(default)]
    pub resources: Option<ResourcesCapability>,
    #[serde(default)]
    pub prompts: Option<PromptsCapability>,
    #[serde(default)]
    pub logging: Option<LoggingCapability>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct ToolsCapability {
    #[serde(default)]
    pub list_changed: bool,
}

#[derive(Debug, Clone, Deserialize)]
pub struct ResourcesCapability {
    #[serde(default)]
    pub subscribe: bool,
    #[serde(default)]
    pub list_changed: bool,
}

#[derive(Debug, Clone, Deserialize)]
pub struct PromptsCapability {
    #[serde(default)]
    pub list_changed: bool,
}

#[derive(Debug, Clone, Deserialize)]
pub struct LoggingCapability {}

/// Implementation info
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Implementation {
    pub name: String,
    pub version: String,
}

/// Initialize result
#[derive(Debug, Clone, Deserialize)]
pub struct InitializeResult {
    pub protocol_version: String,
    pub capabilities: ServerCapabilities,
    pub server_info: Implementation,
}

/// Tool from tools/list
#[derive(Debug, Clone, Deserialize)]
pub struct Tool {
    pub name: String,
    #[serde(default)]
    pub description: Option<String>,
    pub input_schema: serde_json::Value,
}

/// Resource from resources/list
#[derive(Debug, Clone, Deserialize)]
pub struct Resource {
    pub uri: String,
    pub name: String,
    #[serde(default)]
    pub description: Option<String>,
    #[serde(default)]
    pub mime_type: Option<String>,
}

/// Prompt from prompts/list
#[derive(Debug, Clone, Deserialize)]
pub struct Prompt {
    pub name: String,
    #[serde(default)]
    pub description: Option<String>,
    #[serde(default)]
    pub arguments: Vec<PromptArgument>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct PromptArgument {
    pub name: String,
    #[serde(default)]
    pub description: Option<String>,
    #[serde(default)]
    pub required: bool,
}

/// Tool call result
#[derive(Debug, Clone, Deserialize)]
pub struct CallToolResult {
    pub content: Vec<ContentBlock>,
    #[serde(default)]
    pub is_error: bool,
}

#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ContentBlock {
    Text { text: String },
    Image { data: String, mime_type: String },
    Resource { resource: ResourceContent },
}

#[derive(Debug, Clone, Deserialize)]
pub struct ResourceContent {
    pub uri: String,
    #[serde(default)]
    pub mime_type: Option<String>,
    #[serde(default)]
    pub text: Option<String>,
    #[serde(default)]
    pub blob: Option<String>,
}

/// Read resource result
#[derive(Debug, Clone, Deserialize)]
pub struct ReadResourceResult {
    pub contents: Vec<ResourceContent>,
}

/// Get prompt result
#[derive(Debug, Clone, Deserialize)]
pub struct GetPromptResult {
    #[serde(default)]
    pub description: Option<String>,
    pub messages: Vec<PromptMessage>,
}

#[derive(Debug, Clone, Deserialize)]
pub struct PromptMessage {
    pub role: String,
    pub content: ContentBlock,
}
  • Step 2: 验证编译通过

Run: cd crates/zclaw-protocols && cargo check Expected: 编译成功

  • Step 3: 提交 MCP 类型定义
git add crates/zclaw-protocols/src/mcp_types.rs
git commit -m "feat(protocols): add MCP JSON-RPC type definitions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"

Task 5.2: 创建 MCP 传输层

Files:

  • Create: crates/zclaw-protocols/src/mcp_transport.rs

  • Step 1: 创建传输层 trait 和实现

//! MCP Transport layer - stdio and HTTP

use async_trait::async_trait;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use serde_json::Value;
use std::io;
use std::process::Stdio;
use zclaw_types::Result;
use zclaw_types::ZclawError;

use super::mcp_types::{JsonRpcRequest, JsonRpcResponse};

/// MCP Transport trait
#[async_trait]
pub trait McpTransport: Send + Sync {
    /// Send a request and receive response
    async fn request(&mut self, req: JsonRpcRequest) -> Result<JsonRpcResponse>;

    /// Send a notification (no response expected)
    async fn notify(&mut self, method: &str, params: Option<Value>) -> Result<()>;

    /// Close the transport
    async fn close(&mut self) -> Result<()>;
}

/// Stdio transport for local MCP servers
pub struct StdioTransport {
    process: Child,
    stdin: ChildStdin,
    stdout: BufReader<ChildStdout>,
    request_id: u64,
}

impl StdioTransport {
    /// Start a new MCP server process and connect via stdio
    pub async fn start(command: &str, args: &[&str]) -> Result<Self> {
        let mut cmd = Command::new(command);
        cmd.args(args)
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::null());

        let mut process = cmd.spawn()
            .map_err(|e| ZclawError::McpError(format!("Failed to start MCP server: {}", e)))?;

        let stdin = process.stdin.take()
            .ok_or_else(|| ZclawError::McpError("Failed to open stdin".to_string()))?;

        let stdout = process.stdout.take()
            .ok_or_else(|| ZclawError::McpError("Failed to open stdout".to_string()))?;

        Ok(Self {
            process,
            stdin,
            stdout: BufReader::new(stdout),
            request_id: 0,
        })
    }

    fn next_request_id(&mut self) -> u64 {
        self.request_id += 1;
        self.request_id
    }
}

#[async_trait]
impl McpTransport for StdioTransport {
    async fn request(&mut self, req: JsonRpcRequest) -> Result<JsonRpcResponse> {
        // Serialize and send request
        let json = serde_json::to_string(&req)
            .map_err(|e| ZclawError::McpError(format!("JSON serialize error: {}", e)))?;

        let line = format!("Content-Length: {}\r\n\r\n{}", json.len(), json);

        self.stdin.write_all(line.as_bytes()).await
            .map_err(|e| ZclawError::McpError(format!("Write error: {}", e)))?;

        self.stdin.flush().await
            .map_err(|e| ZclawError::McpError(format!("Flush error: {}", e)))?;

        // Read response
        // Read Content-Length header
        let mut header_line = String::new();
        self.stdout.read_line(&mut header_line).await
            .map_err(|e| ZclawError::McpError(format!("Read header error: {}", e)))?;

        let content_length: usize = header_line
            .strip_prefix("Content-Length: ")
            .and_then(|s| s.trim().parse().ok())
            .ok_or_else(|| ZclawError::McpError("Invalid Content-Length header".to_string()))?;

        // Read empty line
        let mut empty_line = String::new();
        self.stdout.read_line(&mut empty_line).await
            .map_err(|e| ZclawError::McpError(format!("Read separator error: {}", e)))?;

        // Read body
        let mut body = vec![0u8; content_length];
        use tokio::io::AsyncReadExt;
        self.stdout.read_exact(&mut body).await
            .map_err(|e| ZclawError::McpError(format!("Read body error: {}", e)))?;

        let body_str = String::from_utf8(body)
            .map_err(|e| ZclawError::McpError(format!("UTF-8 decode error: {}", e)))?;

        let response: JsonRpcResponse = serde_json::from_str(&body_str)
            .map_err(|e| ZclawError::McpError(format!("JSON parse error: {}", e)))?;

        Ok(response)
    }

    async fn notify(&mut self, method: &str, params: Option<Value>) -> Result<()> {
        let id = self.next_request_id();
        let req = JsonRpcRequest::new(id, method).with_params(params.unwrap_or(Value::Null));

        let json = serde_json::to_string(&req)
            .map_err(|e| ZclawError::McpError(format!("JSON serialize error: {}", e)))?;

        let line = format!("Content-Length: {}\r\n\r\n{}", json.len(), json);

        self.stdin.write_all(line.as_bytes()).await
            .map_err(|e| ZclawError::McpError(format!("Write error: {}", e)))?;

        self.stdin.flush().await
            .map_err(|e| ZclawError::McpError(format!("Flush error: {}", e)))?;

        Ok(())
    }

    async fn close(&mut self) -> Result<()> {
        let _ = self.process.kill().await;
        Ok(())
    }
}

/// HTTP transport for remote MCP servers
pub struct HttpTransport {
    base_url: String,
    client: reqwest::Client,
    request_id: u64,
}

impl HttpTransport {
    pub fn new(base_url: impl Into<String>) -> Self {
        Self {
            base_url: base_url.into(),
            client: reqwest::Client::new(),
            request_id: 0,
        }
    }

    fn next_request_id(&mut self) -> u64 {
        self.request_id += 1;
        self.request_id
    }
}

#[async_trait]
impl McpTransport for HttpTransport {
    async fn request(&mut self, req: JsonRpcRequest) -> Result<JsonRpcResponse> {
        let response = self.client
            .post(&self.base_url)
            .json(&req)
            .send()
            .await
            .map_err(|e| ZclawError::McpError(format!("HTTP request error: {}", e)))?;

        if !response.status().is_success() {
            return Err(ZclawError::McpError(format!("HTTP error: {}", response.status())));
        }

        let jsonrpc_response: JsonRpcResponse = response.json().await
            .map_err(|e| ZclawError::McpError(format!("JSON parse error: {}", e)))?;

        Ok(jsonrpc_response)
    }

    async fn notify(&mut self, _method: &str, _params: Option<Value>) -> Result<()> {
        // HTTP doesn't support notifications in the same way
        // Could implement as fire-and-forget POST
        Ok(())
    }

    async fn close(&mut self) -> Result<()> {
        Ok(())
    }
}
  • Step 2: 在 ZclawError 中添加 McpError 变体

检查 crates/zclaw-types/src/error.rs,确保有:

pub enum ZclawError {
    // ... existing variants
    McpError(String),
}
  • Step 3: 验证编译通过

Run: cd crates/zclaw-protocols && cargo check Expected: 编译成功

  • Step 4: 提交传输层实现
git add crates/zclaw-protocols/src/mcp_transport.rs crates/zclaw-types/src/error.rs
git commit -m "feat(protocols): add MCP transport layer (stdio/HTTP)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"

Chunk 6: MCP 协议 - 客户端实现

Task 6.1: 实现完整 MCP 客户端

Files:

  • Create: crates/zclaw-protocols/src/mcp_client.rs

  • Modify: crates/zclaw-protocols/src/mcp.rs

  • Modify: crates/zclaw-protocols/src/lib.rs

  • Step 1: 创建 MCP 客户端

//! MCP Client implementation

use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use zclaw_types::{Result, ZclawError};

use super::mcp_transport::McpTransport;
use super::mcp_types::*;
use super::{McpClient, McpTool, McpResource, McpPrompt, McpToolCallRequest, McpToolCallResponse, McpResourceContent};

/// MCP Client configuration
#[derive(Debug, Clone)]
pub struct McpClientConfig {
    pub name: String,
    pub transport_type: McpTransportType,
}

#[derive(Debug, Clone)]
pub enum McpTransportType {
    Stdio { command: String, args: Vec<String> },
    Http { url: String },
}

/// Full MCP client implementation
pub struct FullMcpClient {
    config: McpClientConfig,
    transport: Arc<RwLock<Box<dyn McpTransport>>>,
    server_capabilities: RwLock<Option<ServerCapabilities>>,
    server_info: RwLock<Option<Implementation>>,
    initialized: RwLock<bool>,
}

impl FullMcpClient {
    /// Create and initialize a new MCP client
    pub async fn connect(config: McpClientConfig) -> Result<Self> {
        let transport: Box<dyn McpTransport> = match &config.transport_type {
            McpTransportType::Stdio { command, args } => {
                let args_ref: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
                Box::new(
                    super::mcp_transport::StdioTransport::start(command, &args_ref).await?
                )
            }
            McpTransportType::Http { url } => {
                Box::new(super::mcp_transport::HttpTransport::new(url))
            }
        };

        let client = Self {
            config,
            transport: Arc::new(RwLock::new(transport)),
            server_capabilities: RwLock::new(None),
            server_info: RwLock::new(None),
            initialized: RwLock::new(false),
        };

        // Initialize connection
        client.initialize().await?;

        Ok(client)
    }

    /// Send initialize request
    async fn initialize(&self) -> Result<()> {
        let init_request = InitializeRequest::default();

        let request = JsonRpcRequest::new(1, "initialize")
            .with_params(serde_json::to_value(&init_request)
                .map_err(|e| ZclawError::McpError(format!("Serialize error: {}", e)))?);

        let mut transport = self.transport.write().await;
        let response = transport.request(request).await?;

        if let Some(error) = response.error {
            return Err(ZclawError::McpError(format!("Initialize error: {}", error.message)));
        }

        let result: InitializeResult = response.result
            .ok_or_else(|| ZclawError::McpError("No result in initialize response".to_string()))?
            .deserialize()
            .map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;

        // Store server info
        *self.server_capabilities.write().await = Some(result.capabilities);
        *self.server_info.write().await = Some(result.server_info);

        // Send initialized notification
        transport.notify("notifications/initialized", None).await?;

        *self.initialized.write().await = true;

        Ok(())
    }

    /// Check if server supports tools
    pub async fn supports_tools(&self) -> bool {
        self.server_capabilities.read().await
            .as_ref()
            .map(|c| c.tools.is_some())
            .unwrap_or(false)
    }

    /// Check if server supports resources
    pub async fn supports_resources(&self) -> bool {
        self.server_capabilities.read().await
            .as_ref()
            .map(|c| c.resources.is_some())
            .unwrap_or(false)
    }

    /// Check if server supports prompts
    pub async fn supports_prompts(&self) -> bool {
        self.server_capabilities.read().await
            .as_ref()
            .map(|c| c.prompts.is_some())
            .unwrap_or(false)
    }
}

#[async_trait]
impl McpClient for FullMcpClient {
    async fn list_tools(&self) -> Result<Vec<McpTool>> {
        if !self.supports_tools().await {
            return Ok(Vec::new());
        }

        let request = JsonRpcRequest::new(2, "tools/list", None);
        let response = self.transport.write().await.request(request).await?;

        if let Some(error) = response.error {
            return Err(ZclawError::McpError(format!("List tools error: {}", error.message)));
        }

        #[derive(Deserialize)]
        struct ToolsListResult {
            tools: Vec<Tool>,
        }

        let result: ToolsListResult = response.result
            .ok_or_else(|| ZclawError::McpError("No result".to_string()))?
            .deserialize()
            .map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;

        Ok(result.tools.into_iter().map(|t| McpTool {
            name: t.name,
            description: t.description.unwrap_or_default(),
            input_schema: t.input_schema,
        }).collect())
    }

    async fn call_tool(&self, request: McpToolCallRequest) -> Result<McpToolCallResponse> {
        let params = serde_json::json!({
            "name": request.name,
            "arguments": request.arguments,
        });

        let rpc_request = JsonRpcRequest::new(3, "tools/call").with_params(params);
        let response = self.transport.write().await.request(rpc_request).await?;

        if let Some(error) = response.error {
            return Err(ZclawError::McpError(format!("Call tool error: {}", error.message)));
        }

        let result: CallToolResult = response.result
            .ok_or_else(|| ZclawError::McpError("No result".to_string()))?
            .deserialize()
            .map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;

        Ok(McpToolCallResponse {
            content: result.content.into_iter().map(|c| match c {
                ContentBlock::Text { text } => super::McpContent::Text { text },
                ContentBlock::Image { data, mime_type } => super::McpContent::Image { data, mime_type },
                ContentBlock::Resource { resource } => super::McpContent::Resource {
                    resource: McpResourceContent {
                        uri: resource.uri,
                        mime_type: resource.mime_type,
                        text: resource.text,
                        blob: resource.blob,
                    },
                },
            }).collect(),
            is_error: result.is_error,
        })
    }

    async fn list_resources(&self) -> Result<Vec<McpResource>> {
        if !self.supports_resources().await {
            return Ok(Vec::new());
        }

        let request = JsonRpcRequest::new(4, "resources/list", None);
        let response = self.transport.write().await.request(request).await?;

        if let Some(error) = response.error {
            return Err(ZclawError::McpError(format!("List resources error: {}", error.message)));
        }

        #[derive(Deserialize)]
        struct ResourcesListResult {
            resources: Vec<Resource>,
        }

        let result: ResourcesListResult = response.result
            .ok_or_else(|| ZclawError::McpError("No result".to_string()))?
            .deserialize()
            .map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;

        Ok(result.resources.into_iter().map(|r| McpResource {
            uri: r.uri,
            name: r.name,
            description: r.description,
            mime_type: r.mime_type,
        }).collect())
    }

    async fn read_resource(&self, uri: &str) -> Result<McpResourceContent> {
        let params = serde_json::json!({
            "uri": uri,
        });

        let request = JsonRpcRequest::new(5, "resources/read").with_params(params);
        let response = self.transport.write().await.request(request).await?;

        if let Some(error) = response.error {
            return Err(ZclawError::McpError(format!("Read resource error: {}", error.message)));
        }

        let result: ReadResourceResult = response.result
            .ok_or_else(|| ZclawError::McpError("No result".to_string()))?
            .deserialize()
            .map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;

        let content = result.contents.into_iter().next()
            .ok_or_else(|| ZclawError::McpError("No content".to_string()))?;

        Ok(McpResourceContent {
            uri: content.uri,
            mime_type: content.mime_type,
            text: content.text,
            blob: content.blob,
        })
    }

    async fn list_prompts(&self) -> Result<Vec<McpPrompt>> {
        if !self.supports_prompts().await {
            return Ok(Vec::new());
        }

        let request = JsonRpcRequest::new(6, "prompts/list", None);
        let response = self.transport.write().await.request(request).await?;

        if let Some(error) = response.error {
            return Err(ZclawError::McpError(format!("List prompts error: {}", error.message)));
        }

        #[derive(Deserialize)]
        struct PromptsListResult {
            prompts: Vec<Prompt>,
        }

        let result: PromptsListResult = response.result
            .ok_or_else(|| ZclawError::McpError("No result".to_string()))?
            .deserialize()
            .map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;

        Ok(result.prompts.into_iter().map(|p| McpPrompt {
            name: p.name,
            description: p.description.unwrap_or_default(),
            arguments: p.arguments.into_iter().map(|a| super::McpPromptArgument {
                name: a.name,
                description: a.description.unwrap_or_default(),
                required: a.required,
            }).collect(),
        }).collect())
    }

    async fn get_prompt(&self, name: &str, arguments: HashMap<String, String>) -> Result<String> {
        let params = serde_json::json!({
            "name": name,
            "arguments": arguments,
        });

        let request = JsonRpcRequest::new(7, "prompts/get").with_params(params);
        let response = self.transport.write().await.request(request).await?;

        if let Some(error) = response.error {
            return Err(ZclawError::McpError(format!("Get prompt error: {}", error.message)));
        }

        let result: GetPromptResult = response.result
            .ok_or_else(|| ZclawError::McpError("No result".to_string()))?
            .deserialize()
            .map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;

        // Convert messages to string
        let prompt_text: String = result.messages.into_iter()
            .filter_map(|m| match m.content {
                ContentBlock::Text { text } => Some(format!("{}: {}", m.role, text)),
                _ => None,
            })
            .collect::<Vec<_>>()
            .join("\n");

        Ok(prompt_text)
    }
}
  • Step 2: 更新 lib.rs 导出
// crates/zclaw-protocols/src/lib.rs
pub mod mcp;
pub mod mcp_types;
pub mod mcp_transport;
pub mod mcp_client;
pub mod a2a;

pub use mcp::*;
pub use mcp_client::{FullMcpClient, McpClientConfig, McpTransportType};
  • Step 3: 验证编译通过

Run: cd crates/zclaw-protocols && cargo check Expected: 编译成功

  • Step 4: 提交 MCP 客户端实现
git add crates/zclaw-protocols/src/mcp_client.rs crates/zclaw-protocols/src/lib.rs
git commit -m "feat(protocols): implement full MCP client with tools/resources/prompts

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"

里程碑 M2: MCP 协议完成

  • 验证 MCP 功能

创建测试脚本连接 filesystem-mcp:

# 安装 filesystem-mcp
npm install -g @modelcontextprotocol/server-filesystem

# 运行测试
cargo test --package zclaw-protocols --test mcp_test

Expected: 成功连接并列出文件

  • 提交里程碑
git tag v0.2.0-m2-mcp
git push origin v0.2.0-m2-mcp

Chunk 7: Browser Hand 实现

Task 7.1: 添加 playwright 依赖

Files:

  • Modify: crates/zclaw-hands/Cargo.toml

  • Step 1: 添加 playwright 依赖

[dependencies]
# ... existing dependencies
chromiumoxide = { version = "0.7", features = ["tokio-runtime"] }
tokio-stream = "0.1"

注意: 使用 chromiumoxide 作为 playwright-rust 的替代品,因为它的 API 更稳定。

  • Step 2: 验证依赖下载

Run: cd crates/zclaw-hands && cargo fetch Expected: 依赖下载成功

  • Step 3: 提交依赖更新
git add crates/zclaw-hands/Cargo.toml crates/zclaw-hands/Cargo.lock
git commit -m "chore(hands): add chromiumoxide dependency for Browser Hand

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"

Task 7.2: 实现 Browser Hand

Files:

  • Create: crates/zclaw-hands/src/hands/browser.rs

  • Modify: crates/zclaw-hands/src/hands/mod.rs

  • Step 1: 创建 Browser Hand 实现

//! Browser Hand - Browser automation capability

use async_trait::async_trait;
use chromiumoxide::{Browser, BrowserConfig};
use chromiumoxide::cdp::browser_protocol::page::{ScreenshotFormat, ScreenshotParameters};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use zclaw_types::{AgentId, Result, ZclawError};

use crate::hand::{Hand, HandConfig, HandContext, HandResult, HandStatus};

/// Browser Hand - provides browser automation capabilities
pub struct BrowserHand {
    config: HandConfig,
    browser: Arc<Mutex<Option<Browser>>>,
    page_url: Arc<Mutex<Option<String>>>,
}

impl BrowserHand {
    pub fn new() -> Self {
        Self {
            config: HandConfig {
                id: "browser".to_string(),
                name: "Browser".to_string(),
                description: "浏览器自动化能力包 - 自动化网页操作和数据采集".to_string(),
                needs_approval: true,
                dependencies: vec!["chromium".to_string()],
                input_schema: Some(json!({
                    "type": "object",
                    "properties": {
                        "action": {
                            "type": "string",
                            "enum": ["navigate", "click", "input", "screenshot", "wait", "evaluate", "close"]
                        },
                        "url": { "type": "string", "description": "URL to navigate to" },
                        "selector": { "type": "string", "description": "CSS selector" },
                        "text": { "type": "string", "description": "Text to input" },
                        "timeout": { "type": "number", "description": "Timeout in milliseconds" },
                        "script": { "type": "string", "description": "JavaScript to evaluate" },
                    },
                    "required": ["action"]
                })),
                tags: vec!["browser".to_string(), "automation".to_string(), "web-scraping".to_string()],
                enabled: true,
            },
            browser: Arc::new(Mutex::new(None)),
            page_url: Arc::new(Mutex::new(None)),
        }
    }

    async fn ensure_browser(&self) -> Result<Browser> {
        let mut browser_guard = self.browser.lock().await;

        if browser_guard.is_none() {
            let config = BrowserConfig::builder()
                .window_size(1920, 1080)
                .request_timeout(Duration::from_secs(30))
                .build()
                .map_err(|e| ZclawError::HandError(format!("Browser config error: {}", e)))?;

            let browser = Browser::launch(config)
                .await
                .map_err(|e| ZclawError::HandError(format!("Browser launch error: {}", e)))?;

            *browser_guard = Some(browser.clone());
        }

        Ok(browser_guard.clone().unwrap())
    }

    async fn navigate(&self, url: &str) -> Result<Value> {
        let browser = self.ensure_browser().await?;
        let page = browser.new_page("about:blank").await
            .map_err(|e| ZclawError::HandError(format!("Page create error: {}", e)))?;

        page.goto(url).await
            .map_err(|e| ZclawError::HandError(format!("Navigate error: {}", e)))?;

        page.wait_for_navigation().await
            .map_err(|e| ZclawError::HandError(format!("Wait navigation error: {}", e)))?;

        let title = page.get_title().await
            .map_err(|e| ZclawError::HandError(format!("Get title error: {}", e)))?;

        *self.page_url.lock().await = Some(url.to_string());

        Ok(json!({
            "success": true,
            "url": url,
            "title": title.unwrap_or_default()
        }))
    }

    async fn click(&self, selector: &str) -> Result<Value> {
        let browser = self.ensure_browser().await?;

        let pages = browser.pages().await
            .map_err(|e| ZclawError::HandError(format!("Get pages error: {}", e)))?;

        let page = pages.first()
            .ok_or_else(|| ZclawError::HandError("No active page".to_string()))?;

        page.find_element(selector).await
            .map_err(|e| ZclawError::HandError(format!("Find element error: {}", e)))?
            .click().await
            .map_err(|e| ZclawError::HandError(format!("Click error: {}", e)))?;

        Ok(json!({
            "success": true,
            "selector": selector
        }))
    }

    async fn input_text(&self, selector: &str, text: &str) -> Result<Value> {
        let browser = self.ensure_browser().await?;

        let pages = browser.pages().await
            .map_err(|e| ZclawError::HandError(format!("Get pages error: {}", e)))?;

        let page = pages.first()
            .ok_or_else(|| ZclawError::HandError("No active page".to_string()))?;

        page.find_element(selector).await
            .map_err(|e| ZclawError::HandError(format!("Find element error: {}", e)))?
            .type_str(text).await
            .map_err(|e| ZclawError::HandError(format!("Input error: {}", e)))?;

        Ok(json!({
            "success": true,
            "selector": selector,
            "input_length": text.len()
        }))
    }

    async fn screenshot(&self) -> Result<Value> {
        let browser = self.ensure_browser().await?;

        let pages = browser.pages().await
            .map_err(|e| ZclawError::HandError(format!("Get pages error: {}", e)))?;

        let page = pages.first()
            .ok_or_else(|| ZclawError::HandError("No active page".to_string()))?;

        let params = ScreenshotParameters::builder()
            .format(ScreenshotFormat::Png)
            .build();

        let screenshot = page.screenshot(params).await
            .map_err(|e| ZclawError::HandError(format!("Screenshot error: {}", e)))?;

        let base64 = base64::encode(&screenshot);

        Ok(json!({
            "success": true,
            "format": "png",
            "data": base64,
            "size_bytes": screenshot.len()
        }))
    }

    async fn wait_for(&self, selector: &str, timeout_ms: u64) -> Result<Value> {
        let browser = self.ensure_browser().await?;

        let pages = browser.pages().await
            .map_err(|e| ZclawError::HandError(format!("Get pages error: {}", e)))?;

        let page = pages.first()
            .ok_or_else(|| ZclawError::HandError("No active page".to_string()))?;

        tokio::time::timeout(
            Duration::from_millis(timeout_ms),
            page.wait_for_element(selector)
        ).await
            .map_err(|_| ZclawError::HandError(format!("Wait timeout for: {}", selector)))?
            .map_err(|e| ZclawError::HandError(format!("Wait error: {}", e)))?;

        Ok(json!({
            "success": true,
            "selector": selector
        }))
    }

    async fn evaluate(&self, script: &str) -> Result<Value> {
        let browser = self.ensure_browser().await?;

        let pages = browser.pages().await
            .map_err(|e| ZclawError::HandError(format!("Get pages error: {}", e)))?;

        let page = pages.first()
            .ok_or_else(|| ZclawError::HandError("No active page".to_string()))?;

        let result = page.evaluate(script).await
            .map_err(|e| ZclawError::HandError(format!("Evaluate error: {}", e)))?;

        Ok(json!({
            "success": true,
            "result": result
        }))
    }

    async fn close(&self) -> Result<Value> {
        let mut browser_guard = self.browser.lock().await;

        if let Some(browser) = browser_guard.take() {
            browser.close().await
                .map_err(|e| ZclawError::HandError(format!("Close error: {}", e)))?;
        }

        *self.page_url.lock().await = None;

        Ok(json!({ "success": true }))
    }
}

impl Default for BrowserHand {
    fn default() -> Self {
        Self::new()
    }
}

#[async_trait]
impl Hand for BrowserHand {
    fn config(&self) -> &HandConfig {
        &self.config
    }

    async fn execute(&self, _context: &HandContext, input: Value) -> Result<HandResult> {
        #[derive(Deserialize)]
        struct BrowserInput {
            action: String,
            url: Option<String>,
            selector: Option<String>,
            text: Option<String>,
            timeout: Option<u64>,
            script: Option<String>,
        }

        let input: BrowserInput = serde_json::from_value(input)
            .map_err(|e| ZclawError::HandError(format!("Invalid input: {}", e)))?;

        let result = match input.action.as_str() {
            "navigate" => {
                let url = input.url.ok_or_else(||
                    ZclawError::HandError("URL required for navigate".to_string()))?;
                self.navigate(&url).await?
            }
            "click" => {
                let selector = input.selector.ok_or_else(||
                    ZclawError::HandError("Selector required for click".to_string()))?;
                self.click(&selector).await?
            }
            "input" => {
                let selector = input.selector.ok_or_else(||
                    ZclawError::HandError("Selector required for input".to_string()))?;
                let text = input.text.ok_or_else(||
                    ZclawError::HandError("Text required for input".to_string()))?;
                self.input_text(&selector, &text).await?
            }
            "screenshot" => self.screenshot().await?,
            "wait" => {
                let selector = input.selector.ok_or_else(||
                    ZclawError::HandError("Selector required for wait".to_string()))?;
                let timeout = input.timeout.unwrap_or(5000);
                self.wait_for(&selector, timeout).await?
            }
            "evaluate" => {
                let script = input.script.ok_or_else(||
                    ZclawError::HandError("Script required for evaluate".to_string()))?;
                self.evaluate(&script).await?
            }
            "close" => self.close().await?,
            _ => {
                return Ok(HandResult::error(format!("Unknown action: {}", input.action)));
            }
        };

        Ok(HandResult::success(result))
    }

    fn is_dependency_available(&self, dep: &str) -> bool {
        match dep {
            "chromium" => {
                // Check if chromium/chrome is available
                std::process::Command::new("chromium")
                    .arg("--version")
                    .output()
                    .is_ok() ||
                std::process::Command::new("chrome")
                    .arg("--version")
                    .output()
                    .is_ok()
            }
            _ => true
        }
    }
}
  • Step 2: 添加 base64 依赖

crates/zclaw-hands/Cargo.toml 添加:

base64 = "0.22"
  • Step 3: 更新 mod.rs 导出
// crates/zclaw-hands/src/hands/mod.rs
//! Hands - Autonomous capabilities

mod whiteboard;
mod slideshow;
mod speech;
mod quiz;
mod browser;

pub use whiteboard::*;
pub use slideshow::*;
pub use speech::*;
pub use quiz::*;
pub use browser::*;
  • Step 4: 在 ZclawError 中添加 HandError 变体

确保 crates/zclaw-types/src/error.rs 有:

HandError(String),
  • Step 5: 验证编译通过

Run: cd crates/zclaw-hands && cargo check Expected: 编译成功

  • Step 6: 提交 Browser Hand 实现
git add crates/zclaw-hands/src/hands/browser.rs crates/zclaw-hands/src/hands/mod.rs crates/zclaw-hands/Cargo.toml crates/zclaw-types/src/error.rs
git commit -m "feat(hands): implement Browser Hand with chromiumoxide

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"

里程碑 M3: Browser Hand 完成

  • 验证 Browser Hand 功能

Run: cd crates/zclaw-hands && cargo test browser_hand -- --nocapture Expected: 测试通过

  • 提交里程碑
git tag v0.2.0-m3-browser
git push origin v0.2.0-m3-browser

Chunk 8: 工具安全 + 发布准备

Task 8.1: 创建安全配置文件

Files:

  • Create: config/security.toml

  • Step 1: 创建安全配置

# ZCLAW Security Configuration
# Controls which commands and operations are allowed

[shell_exec]
# Enable shell command execution
enabled = true
# Default timeout in seconds
default_timeout = 60
# Maximum output size in bytes
max_output_size = 1048576  # 1MB

# Whitelist of allowed commands
# If whitelist is non-empty, only these commands are allowed
allowed_commands = [
    "git",
    "npm",
    "pnpm",
    "node",
    "cargo",
    "rustc",
    "python",
    "python3",
    "pip",
    "ls",
    "cat",
    "echo",
    "mkdir",
    "rm",
    "cp",
    "mv",
    "grep",
    "find",
    "head",
    "tail",
    "wc",
]

# Blacklist of dangerous commands (always blocked)
blocked_commands = [
    "rm -rf /",
    "dd",
    "mkfs",
    "format",
    "shutdown",
    "reboot",
    "init",
    "systemctl",
]

[file_read]
enabled = true
# Allowed directory prefixes (empty = allow all)
allowed_paths = []
# Blocked paths (always blocked)
blocked_paths = [
    "/etc/shadow",
    "/etc/passwd",
    "~/.ssh",
    "~/.gnupg",
]

[file_write]
enabled = true
# Maximum file size in bytes (10MB)
max_file_size = 10485760
# Blocked paths
blocked_paths = [
    "/etc",
    "/usr",
    "/bin",
    "/sbin",
    "C:\\Windows",
    "C:\\Program Files",
]

[web_fetch]
enabled = true
# Request timeout in seconds
timeout = 30
# Maximum response size in bytes (10MB)
max_response_size = 10485760
# Block internal/private IP ranges (SSRF protection)
block_private_ips = true
# Allowed domains (empty = allow all)
allowed_domains = []
# Blocked domains
blocked_domains = []
  • Step 2: 提交安全配置
git add config/security.toml
git commit -m "feat(config): add security configuration file

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"

Task 8.2: 实现工具安全验证

Files:

  • Modify: crates/zclaw-runtime/src/tool/builtin/shell_exec.rs

  • Step 1: 实现命令白名单验证

//! Shell execution tool with security controls

use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::process::Command;
use std::time::{Duration, Instant};
use tokio::time::timeout;
use zclaw_types::{Result, ZclawError};

use super::{BuiltinTool, ToolContext, ToolResult};

/// Security configuration for shell execution
#[derive(Debug, Clone, Deserialize)]
pub struct ShellSecurityConfig {
    pub enabled: bool,
    pub default_timeout: u64,
    pub max_output_size: usize,
    pub allowed_commands: Vec<String>,
    pub blocked_commands: Vec<String>,
}

impl Default for ShellSecurityConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            default_timeout: 60,
            max_output_size: 1024 * 1024, // 1MB
            allowed_commands: vec![
                "git".to_string(), "npm".to_string(), "pnpm".to_string(),
                "node".to_string(), "cargo".to_string(), "rustc".to_string(),
                "python".to_string(), "python3".to_string(), "pip".to_string(),
                "ls".to_string(), "cat".to_string(), "echo".to_string(),
            ],
            blocked_commands: vec![
                "rm -rf /".to_string(), "dd".to_string(), "mkfs".to_string(),
                "shutdown".to_string(), "reboot".to_string(),
            ],
        }
    }
}

impl ShellSecurityConfig {
    /// Load from config file
    pub fn load() -> Self {
        // Try to load from config/security.toml
        if let Ok(content) = std::fs::read_to_string("config/security.toml") {
            if let Ok(config) = toml::from_str::<SecurityConfigFile>(&content) {
                return config.shell_exec;
            }
        }
        Self::default()
    }

    /// Check if a command is allowed
    pub fn is_command_allowed(&self, command: &str) -> Result<()> {
        // Parse the base command
        let base_cmd = command.split_whitespace().next().unwrap_or("");

        // Check blocked commands first
        for blocked in &self.blocked_commands {
            if command.starts_with(blocked) || command.contains(blocked) {
                return Err(ZclawError::SecurityError(
                    format!("Command blocked: {}", blocked)
                ));
            }
        }

        // If whitelist is non-empty, check against it
        if !self.allowed_commands.is_empty() {
            let allowed: HashSet<&str> = self.allowed_commands
                .iter()
                .map(|s| s.as_str())
                .collect();

            if !allowed.contains(base_cmd) {
                return Err(ZclawError::SecurityError(
                    format!("Command not in whitelist: {}", base_cmd)
                ));
            }
        }

        Ok(())
    }
}

#[derive(Debug, Deserialize)]
struct SecurityConfigFile {
    shell_exec: ShellSecurityConfig,
}

/// Shell execution tool
pub struct ShellExecTool {
    config: ShellSecurityConfig,
}

impl ShellExecTool {
    pub fn new() -> Self {
        Self {
            config: ShellSecurityConfig::load(),
        }
    }
}

impl Default for ShellExecTool {
    fn default() -> Self {
        Self::new()
    }
}

#[async_trait]
impl BuiltinTool for ShellExecTool {
    fn name(&self) -> &str {
        "shell_exec"
    }

    fn description(&self) -> &str {
        "Execute shell commands with security controls"
    }

    fn input_schema(&self) -> serde_json::Value {
        serde_json::json!({
            "type": "object",
            "properties": {
                "command": {
                    "type": "string",
                    "description": "The shell command to execute"
                },
                "timeout": {
                    "type": "number",
                    "description": "Timeout in seconds (default: 60)"
                }
            },
            "required": ["command"]
        })
    }

    async fn execute(&self, _context: &ToolContext, input: serde_json::Value) -> Result<ToolResult> {
        #[derive(Deserialize)]
        struct Input {
            command: String,
            #[serde(default = "default_timeout")]
            timeout: u64,
        }

        fn default_timeout() -> u64 { 60 }

        let input: Input = serde_json::from_value(input)
            .map_err(|e| ZclawError::ToolError(format!("Invalid input: {}", e)))?;

        // Security check
        self.config.is_command_allowed(&input.command)?;

        // Determine timeout
        let timeout_secs = input.timeout.min(self.config.default_timeout * 2);
        let timeout_duration = Duration::from_secs(timeout_secs);

        // Execute command
        let start = Instant::now();

        let output = timeout(timeout_duration, async {
            #[cfg(target_os = "windows")]
            let output = Command::new("cmd")
                .args(["/C", &input.command])
                .output();

            #[cfg(not(target_os = "windows"))]
            let output = Command::new("sh")
                .args(["-c", &input.command])
                .output();

            output
        })
        .await
        .map_err(|_| ZclawError::ToolError(format!("Command timed out after {}s", timeout_secs)))?
        .map_err(|e| ZclawError::ToolError(format!("Failed to execute command: {}", e)))?;

        let duration_ms = start.elapsed().as_millis() as u64;

        // Truncate output if too large
        let stdout = String::from_utf8_lossy(&output.stdout);
        let stderr = String::from_utf8_lossy(&output.stderr);

        let stdout_truncated = if stdout.len() > self.config.max_output_size {
            &stdout[..self.config.max_output_size]
        } else {
            &stdout
        };

        Ok(ToolResult {
            success: output.status.success(),
            output: serde_json::json!({
                "stdout": stdout_truncated,
                "stderr": stderr,
                "exit_code": output.status.code(),
                "duration_ms": duration_ms,
            }),
            error: if output.status.success() {
                None
            } else {
                Some(format!("Exit code: {:?}", output.status.code()))
            },
        })
    }
}
  • Step 2: 添加 SecurityError 到 ZclawError

crates/zclaw-types/src/error.rs 添加:

SecurityError(String),
  • Step 3: 验证编译通过

Run: cd crates/zclaw-runtime && cargo check Expected: 编译成功

  • Step 4: 提交工具安全实现
git add crates/zclaw-runtime/src/tool/builtin/shell_exec.rs crates/zclaw-types/src/error.rs
git commit -m "feat(runtime): implement shell command security whitelist

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"

里程碑 M4: 工具安全完成

  • 验证安全功能

Run: cd crates/zclaw-runtime && cargo test shell_exec -- --nocapture Expected: 安全测试通过,恶意命令被拦截

  • 提交里程碑
git tag v0.2.0-m4-security
git push origin v0.2.0-m4-security

发布准备清单

Week 7-8: 最终验收

  • 功能验收

    • 流式响应:消息逐字显示,延迟 < 500ms
    • MCP 协议:连接 filesystem-mcp读取文件成功
    • Browser Hand打开网页、点击、截图成功
    • 工具安全:恶意命令被拦截
    • 核心对话:发送消息 → 收到响应 → 无崩溃
  • 文档验收

    • 用户手册更新
    • CHANGELOG 编写
    • README 更新
  • 打包验收

    • Windows 安装包构建成功
    • 自签名证书配置
    • 安装测试通过
  • 发布验收

    • GitHub Release 草稿准备
    • 反馈渠道建立 (GitHub Issues)

最终发布命令

# 构建发布版本
cd desktop && pnpm tauri build

# 创建标签
git tag -a v0.2.0 -m "ZCLAW v0.2.0 - Streaming, MCP, Browser Hand"

# 推送标签
git push origin v0.2.0

# 上传到 GitHub Releases
gh release create v0.2.0 \
  ./desktop/src-tauri/target/release/bundle/msi/*.msi \
  ./desktop/src-tauri/target/release/bundle/nsis/*.exe \
  --title "ZCLAW v0.2.0" \
  --notes-file CHANGELOG.md

风险应急方案

风险 应急措施
流式响应延期 使用 SSE 简化方案
MCP 兼容问题 优先支持 filesystem-mcp
Browser 依赖问题 提示用户安装 Chromium
测试覆盖不足 延长 Week 6增加测试时间
发布后崩溃 24h 内发布 hotfix