Files
zclaw_openfang/docs/superpowers/plans/2026-03-24-v0.2.0-release-implementation.md
2026-03-24 01:34:17 +08:00

2732 lines
78 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# ZCLAW v0.2.0 发布实施计划
> **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** 完成 ZCLAW v0.2.0 正式版发布包含流式响应、MCP 协议、Browser Hand 和工具安全功能。
**Architecture:** 基于 Tauri 的桌面应用Rust 后端 + React 前端。流式响应通过 Tauri 事件系统传递MCP 使用 JSON-RPC 2.0 协议Browser Hand 基于 playwright-rust。
**Tech Stack:** Rust, Tauri 2.x, React 19, TypeScript, playwright-rust, reqwest, tokio
**Spec Document:** `docs/superpowers/specs/2026-03-24-v0.2.0-release-plan-design.md`
---
## File Structure
### 新增文件
```
crates/zclaw-runtime/
├── src/stream.rs # 流式响应类型定义 (修改)
├── src/driver/streaming.rs # 新增 - 流式驱动实现
crates/zclaw-protocols/
├── src/mcp_client.rs # 新增 - MCP 客户端实现
├── src/mcp_types.rs # 新增 - MCP JSON-RPC 类型
├── src/mcp_transport.rs # 新增 - MCP 传输层 (stdio/HTTP/WS)
crates/zclaw-hands/
├── src/hands/browser.rs # 新增 - Browser Hand 实现
config/
├── security.toml # 新增 - 安全配置
desktop/src/
├── lib/streaming-client.ts # 新增 - 流式客户端
```
### 修改文件
```
crates/zclaw-runtime/src/driver/mod.rs # LlmDriver trait 添加 stream()
crates/zclaw-runtime/src/driver/anthropic.rs # Anthropic 流式实现
crates/zclaw-runtime/src/driver/openai.rs # OpenAI 流式实现
crates/zclaw-runtime/src/loop_runner.rs # 流式循环实现
crates/zclaw-protocols/src/mcp.rs # 完整 MCP 实现
crates/zclaw-protocols/src/lib.rs # 导出新模块
crates/zclaw-hands/src/hands/mod.rs # 导出 Browser Hand
crates/zclaw-hands/Cargo.toml # 添加 playwright 依赖
desktop/src/store/chatStore.ts # 流式事件处理
desktop/src-tauri/src/lib.rs # Tauri 流式命令
```
---
## Chunk 1: 流式响应 - 后端 Trait 和类型
### Task 1.1: 定义流式响应类型
**Files:**
- Modify: `crates/zclaw-runtime/src/stream.rs`
- [ ] **Step 1: 添加流式响应类型定义**
`stream.rs` 中添加:
```rust
//! Streaming response types
use serde::{Deserialize, Serialize};
/// Stream chunk emitted during streaming
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StreamChunk {
/// Text delta
TextDelta { delta: String },
/// Thinking delta (for extended thinking models)
ThinkingDelta { delta: String },
/// Tool use started
ToolUseStart { id: String, name: String },
/// Tool use input delta
ToolUseDelta { id: String, delta: String },
/// Tool use completed
ToolUseEnd { id: String, input: serde_json::Value },
/// Stream completed
Complete {
input_tokens: u32,
output_tokens: u32,
stop_reason: String,
},
/// Error occurred
Error { message: String },
}
/// Streaming event for Tauri emission
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamEvent {
/// Session ID for routing
pub session_id: String,
/// Agent ID for routing
pub agent_id: String,
/// The chunk content
pub chunk: StreamChunk,
}
impl StreamEvent {
pub fn new(session_id: impl Into<String>, agent_id: impl Into<String>, chunk: StreamChunk) -> Self {
Self {
session_id: session_id.into(),
agent_id: agent_id.into(),
chunk,
}
}
}
```
- [ ] **Step 2: 验证编译通过**
Run: `cd crates/zclaw-runtime && cargo check`
Expected: 编译成功,无错误
- [ ] **Step 3: 提交类型定义**
```bash
git add crates/zclaw-runtime/src/stream.rs
git commit -m "feat(runtime): add streaming response types
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
```
---
### Task 1.2: 扩展 LlmDriver Trait
**Files:**
- Modify: `crates/zclaw-runtime/src/driver/mod.rs`
- [ ] **Step 1: 添加 stream 方法到 trait**
`LlmDriver` trait 中添加方法:
```rust
use futures::Stream;
use std::pin::Pin;
use crate::stream::StreamChunk;
/// LLM Driver trait - unified interface for all providers
#[async_trait]
pub trait LlmDriver: Send + Sync {
/// Get the provider name
fn provider(&self) -> &str;
/// Send a completion request
async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse>;
/// Send a streaming completion request
/// Returns a stream of chunks
fn stream(
&self,
request: CompletionRequest,
) -> Pin<Box<dyn Stream<Item = Result<StreamChunk>> + Send + '_>>;
/// Check if the driver is properly configured
fn is_configured(&self) -> bool;
}
```
- [ ] **Step 2: 在 Cargo.toml 添加 futures 依赖**
检查 `crates/zclaw-runtime/Cargo.toml` 是否有 `futures` 依赖,如果没有则添加:
```toml
futures = "0.3"
```
- [ ] **Step 3: 验证编译 (预期失败)**
Run: `cd crates/zclaw-runtime && cargo check 2>&1`
Expected: 编译失败,因为各驱动未实现 `stream()` 方法
- [ ] **Step 4: 提交 trait 扩展**
```bash
git add crates/zclaw-runtime/src/driver/mod.rs crates/zclaw-runtime/Cargo.toml
git commit -m "feat(runtime): add stream() method to LlmDriver trait
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
```
---
## Chunk 2: 流式响应 - Anthropic 驱动实现
### Task 2.1: 实现 Anthropic 流式 API
**Files:**
- Modify: `crates/zclaw-runtime/src/driver/anthropic.rs`
- [ ] **Step 1: 添加流式请求类型**
`anthropic.rs` 中添加 SSE 解析类型:
```rust
use futures::{Stream, StreamExt};
use crate::stream::StreamChunk;
use std::pin::Pin;
/// SSE event from Anthropic API
#[derive(Debug, Deserialize)]
struct AnthropicStreamEvent {
#[serde(rename = "type")]
event_type: String,
#[serde(default)]
index: Option<u32>,
#[serde(default)]
delta: Option<AnthropicDelta>,
#[serde(default)]
content_block: Option<AnthropicContentBlock>,
#[serde(default)]
message: Option<AnthropicStreamMessage>,
}
#[derive(Debug, Deserialize)]
struct AnthropicDelta {
#[serde(default)]
text: Option<String>,
#[serde(default)]
thinking: Option<String>,
#[serde(default)]
partial_json: Option<String>,
}
#[derive(Debug, Deserialize)]
struct AnthropicStreamMessage {
#[serde(default)]
stop_reason: Option<String>,
#[serde(default)]
usage: Option<AnthropicUsage>,
}
```
- [ ] **Step 2: 实现 stream 方法**
```rust
fn stream(
&self,
request: CompletionRequest,
) -> Pin<Box<dyn Stream<Item = Result<StreamChunk>> + Send + '_>> {
Box::pin(async_stream::stream! {
let mut stream_request = self.build_api_request(&request);
stream_request.stream = true;
let response = match self.client
.post(format!("{}/v1/messages", self.base_url))
.header("x-api-key", self.api_key.expose_secret())
.header("anthropic-version", "2023-06-01")
.header("content-type", "application/json")
.json(&stream_request)
.send()
.await
{
Ok(r) => r,
Err(e) => {
yield Err(ZclawError::LlmError(format!("HTTP request failed: {}", e)));
return;
}
};
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
yield Err(ZclawError::LlmError(format!("API error {}: {}", status, body)));
return;
}
let mut byte_stream = response.bytes_stream();
let mut current_tool_id: Option<String> = None;
let mut current_tool_name: Option<String> = None;
let mut tool_input_buffer = String::new();
while let Some(chunk_result) = byte_stream.next().await {
let chunk = match chunk_result {
Ok(c) => c,
Err(e) => {
yield Err(ZclawError::LlmError(format!("Stream error: {}", e)));
continue;
}
};
let text = String::from_utf8_lossy(&chunk);
for line in text.lines() {
if let Some(data) = line.strip_prefix("data: ") {
if data == "[DONE]" {
continue;
}
match serde_json::from_str::<AnthropicStreamEvent>(data) {
Ok(event) => {
match event.event_type.as_str() {
"content_block_delta" => {
if let Some(delta) = event.delta {
if let Some(text) = delta.text {
yield Ok(StreamChunk::TextDelta { delta: text });
}
if let Some(thinking) = delta.thinking {
yield Ok(StreamChunk::ThinkingDelta { delta: thinking });
}
if let Some(json) = delta.partial_json {
tool_input_buffer.push_str(&json);
}
}
}
"content_block_start" => {
if let Some(block) = event.content_block {
match block.block_type.as_str() {
"tool_use" => {
current_tool_id = block.id.clone();
current_tool_name = Some(block.name.clone().unwrap_or_default());
tool_input_buffer.clear();
yield Ok(StreamChunk::ToolUseStart {
id: block.id.unwrap_or_default(),
name: block.name.unwrap_or_default(),
});
}
_ => {}
}
}
}
"content_block_stop" => {
if current_tool_id.is_some() {
let input: serde_json::Value = serde_json::from_str(&tool_input_buffer)
.unwrap_or(serde_json::Value::Object(Default::default()));
yield Ok(StreamChunk::ToolUseEnd {
id: current_tool_id.take().unwrap_or_default(),
input,
});
tool_input_buffer.clear();
}
}
"message_delta" => {
if let Some(msg) = event.message {
if msg.stop_reason.is_some() {
yield Ok(StreamChunk::Complete {
input_tokens: msg.usage.as_ref().map(|u| u.input_tokens).unwrap_or(0),
output_tokens: msg.usage.as_ref().map(|u| u.output_tokens).unwrap_or(0),
stop_reason: msg.stop_reason.unwrap_or_else(|| "end_turn".to_string()),
});
}
}
}
"error" => {
yield Ok(StreamChunk::Error {
message: "Stream error".to_string(),
});
}
_ => {}
}
}
Err(e) => {
// Log but don't yield error for parse failures
tracing::warn!("Failed to parse SSE event: {} - {}", e, data);
}
}
}
}
}
})
}
```
- [ ] **Step 3: 添加 async-stream 依赖**
`crates/zclaw-runtime/Cargo.toml` 添加:
```toml
async-stream = "0.3"
tracing = "0.1"
```
- [ ] **Step 4: 验证编译通过**
Run: `cd crates/zclaw-runtime && cargo check`
Expected: 编译成功
- [ ] **Step 5: 提交 Anthropic 流式实现**
```bash
git add crates/zclaw-runtime/src/driver/anthropic.rs crates/zclaw-runtime/Cargo.toml
git commit -m "feat(runtime): implement streaming for Anthropic driver
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
```
---
### Task 2.2: 实现 OpenAI 流式 API
**Files:**
- Modify: `crates/zclaw-runtime/src/driver/openai.rs`
- [ ] **Step 1: 读取现有 OpenAI 驱动代码**
Run: `cat crates/zclaw-runtime/src/driver/openai.rs | head -100`
Expected: 查看现有实现结构
- [ ] **Step 2: 添加 OpenAI 流式实现**
`openai.rs` 中添加类似的 stream 方法,适配 OpenAI 的 SSE 格式:
```rust
fn stream(
&self,
request: CompletionRequest,
) -> Pin<Box<dyn Stream<Item = Result<StreamChunk>> + Send + '_>> {
Box::pin(async_stream::stream! {
let stream_request = self.build_api_request(&request);
let response = match self.client
.post(format!("{}/v1/chat/completions", self.base_url))
.header("Authorization", format!("Bearer {}", self.api_key.expose_secret()))
.header("Content-Type", "application/json")
.json(&stream_request)
.send()
.await
{
Ok(r) => r,
Err(e) => {
yield Err(ZclawError::LlmError(format!("HTTP request failed: {}", e)));
return;
}
};
let mut byte_stream = response.bytes_stream();
while let Some(chunk_result) = byte_stream.next().await {
let chunk = match chunk_result {
Ok(c) => c,
Err(e) => {
yield Err(ZclawError::LlmError(format!("Stream error: {}", e)));
continue;
}
};
let text = String::from_utf8_lossy(&chunk);
for line in text.lines() {
if let Some(data) = line.strip_prefix("data: ") {
if data == "[DONE]" {
yield Ok(StreamChunk::Complete {
input_tokens: 0,
output_tokens: 0,
stop_reason: "end_turn".to_string(),
});
continue;
}
match serde_json::from_str::<OpenAiStreamResponse>(data) {
Ok(resp) => {
if let Some(choice) = resp.choices.first() {
if let Some(delta) = &choice.delta {
if let Some(content) = &delta.content {
yield Ok(StreamChunk::TextDelta { delta: content.clone() });
}
}
if let Some(tool_calls) = &choice.delta.tool_calls {
for tc in tool_calls {
if let Some(function) = &tc.function {
yield Ok(StreamChunk::ToolUseDelta {
id: tc.id.clone().unwrap_or_default(),
delta: function.arguments.clone().unwrap_or_default(),
});
}
}
}
}
}
Err(e) => {
tracing::warn!("Failed to parse OpenAI SSE: {}", e);
}
}
}
}
}
})
}
```
- [ ] **Step 3: 添加 OpenAI 流式类型**
```rust
#[derive(Debug, Deserialize)]
struct OpenAiStreamResponse {
choices: Vec<OpenAiStreamChoice>,
}
#[derive(Debug, Deserialize)]
struct OpenAiStreamChoice {
delta: OpenAiDelta,
finish_reason: Option<String>,
}
#[derive(Debug, Deserialize)]
struct OpenAiDelta {
content: Option<String>,
tool_calls: Option<Vec<OpenAiToolCallDelta>>,
}
#[derive(Debug, Deserialize)]
struct OpenAiToolCallDelta {
id: Option<String>,
function: Option<OpenAiFunctionDelta>,
}
#[derive(Debug, Deserialize)]
struct OpenAiFunctionDelta {
arguments: Option<String>,
}
```
- [ ] **Step 4: 验证编译通过**
Run: `cd crates/zclaw-runtime && cargo check`
Expected: 编译成功
- [ ] **Step 5: 提交 OpenAI 流式实现**
```bash
git add crates/zclaw-runtime/src/driver/openai.rs
git commit -m "feat(runtime): implement streaming for OpenAI driver
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
```
---
## Chunk 3: 流式响应 - LoopRunner 和 Tauri 集成
### Task 3.1: 实现 LoopRunner 流式循环
**Files:**
- Modify: `crates/zclaw-runtime/src/loop_runner.rs`
- [ ] **Step 1: 修改 run_streaming 方法**
替换现有的 TODO 占位符:
```rust
use crate::stream::{StreamChunk, StreamEvent};
use tauri::{AppHandle, Emitter};
/// Run the agent loop with streaming
pub async fn run_streaming(
&self,
session_id: SessionId,
input: String,
app_handle: Option<AppHandle>,
) -> Result<mpsc::Receiver<LoopEvent>> {
let (tx, rx) = mpsc::channel(100);
// Add user message to session
let user_message = Message::user(input);
self.memory.append_message(&session_id, &user_message).await?;
// Get all messages for context
let messages = self.memory.get_messages(&session_id).await?;
// Build completion request
let request = CompletionRequest {
model: self.model.clone(),
system: self.system_prompt.clone(),
messages,
tools: self.tools.definitions(),
max_tokens: Some(self.max_tokens),
temperature: Some(self.temperature),
stop: Vec::new(),
stream: true,
};
// Clone necessary data for the async task
let session_id_str = session_id.to_string();
let agent_id_str = self.agent_id.to_string();
let memory = self.memory.clone();
let driver = self.driver.clone();
tokio::spawn(async move {
let mut full_response = String::new();
let mut input_tokens = 0u32;
let mut output_tokens = 0u32;
let mut stream = driver.stream(request);
use futures::StreamExt;
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(chunk) => {
// Emit to Tauri if available
if let Some(ref handle) = app_handle {
let event = StreamEvent::new(&session_id_str, &agent_id_str, chunk.clone());
let _ = handle.emit("stream:chunk", event);
}
// Track response and tokens
match &chunk {
StreamChunk::TextDelta { delta } => {
full_response.push_str(delta);
let _ = tx.send(LoopEvent::Delta(delta.clone())).await;
}
StreamChunk::ThinkingDelta { delta } => {
let _ = tx.send(LoopEvent::Delta(format!("[思考] {}", delta))).await;
}
StreamChunk::ToolUseStart { name, .. } => {
let _ = tx.send(LoopEvent::ToolStart {
name: name.clone(),
input: serde_json::Value::Null,
}).await;
}
StreamChunk::ToolUseEnd { input, .. } => {
let _ = tx.send(LoopEvent::ToolEnd {
name: String::new(),
output: input.clone(),
}).await;
}
StreamChunk::Complete { input_tokens: it, output_tokens: ot, .. } => {
input_tokens = *it;
output_tokens = *ot;
}
StreamChunk::Error { message } => {
let _ = tx.send(LoopEvent::Error(message.clone())).await;
}
_ => {}
}
}
Err(e) => {
let _ = tx.send(LoopEvent::Error(e.to_string())).await;
}
}
}
// Save assistant message to memory
let assistant_message = Message::assistant(full_response.clone());
let _ = memory.append_message(&session_id, &assistant_message).await;
// Send completion event
let _ = tx.send(LoopEvent::Complete(AgentLoopResult {
response: full_response,
input_tokens,
output_tokens,
iterations: 1,
})).await;
});
Ok(rx)
}
```
- [ ] **Step 2: 添加 tauri 依赖 (可选)**
如果需要在 runtime crate 中使用 tauri`Cargo.toml` 添加:
```toml
[dependencies]
tauri = { version = "2", optional = true }
[features]
default = []
tauri = ["dep:tauri"]
```
- [ ] **Step 3: 验证编译通过**
Run: `cd crates/zclaw-runtime && cargo check`
Expected: 编译成功
- [ ] **Step 4: 提交 LoopRunner 流式实现**
```bash
git add crates/zclaw-runtime/src/loop_runner.rs crates/zclaw-runtime/Cargo.toml
git commit -m "feat(runtime): implement streaming in AgentLoop
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
```
---
### Task 3.2: 添加 Tauri 流式命令
**Files:**
- Modify: `desktop/src-tauri/src/lib.rs`
- [ ] **Step 1: 添加流式聊天命令**
在 Tauri commands 中添加:
```rust
use zclaw_runtime::{AgentLoop, LoopEvent};
use tauri::{AppHandle, Emitter};
#[tauri::command]
async fn chat_stream(
agent_id: String,
session_id: String,
message: String,
app: AppHandle,
state: State<'_, KernelState>,
) -> Result<(), String> {
let agent_loop = state.kernel
.get_agent_loop(&agent_id)
.await
.map_err(|e| e.to_string())?;
let session: zclaw_types::SessionId = session_id.parse()
.map_err(|_| "Invalid session ID")?;
let mut rx = agent_loop
.run_streaming(session, message, Some(app))
.await
.map_err(|e| e.to_string())?;
// The streaming is handled via Tauri events in run_streaming
// This command just initiates the stream
Ok(())
}
```
- [ ] **Step 2: 注册命令**
`lib.rs``invoke_handler` 中注册:
```rust
.invoke_handler(tauri::generate_handler![
// ... existing commands
chat_stream,
])
```
- [ ] **Step 3: 验证编译通过**
Run: `cd desktop && pnpm tauri build --debug 2>&1 | head -50`
Expected: 编译开始,检查错误
- [ ] **Step 4: 提交 Tauri 流式命令**
```bash
git add desktop/src-tauri/src/lib.rs
git commit -m "feat(tauri): add chat_stream command for streaming
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
```
---
## Chunk 4: 流式响应 - 前端集成
### Task 4.1: 创建流式客户端
**Files:**
- Create: `desktop/src/lib/streaming-client.ts`
- [ ] **Step 1: 创建流式客户端类**
```typescript
// desktop/src/lib/streaming-client.ts
import { listen, UnlistenFn } from '@tauri-apps/api/event';
export interface StreamChunk {
type: 'text_delta' | 'thinking_delta' | 'tool_use_start' | 'tool_use_end' | 'complete' | 'error';
delta?: string;
id?: string;
name?: string;
input?: unknown;
input_tokens?: number;
output_tokens?: number;
stop_reason?: string;
message?: string;
}
export interface StreamEvent {
session_id: string;
agent_id: string;
chunk: StreamChunk;
}
export type StreamHandler = (chunk: StreamChunk) => void;
export class StreamingClient {
private unlisten: UnlistenFn | null = null;
private currentSessionId: string | null = null;
private handlers: Map<string, StreamHandler[]> = new Map();
async connect(): Promise<void> {
this.unlisten = await listen<StreamEvent>('stream:chunk', (event) => {
const { session_id, chunk } = event.payload;
if (this.currentSessionId && session_id === this.currentSessionId) {
const handlers = this.handlers.get(session_id) || [];
handlers.forEach(handler => handler(chunk));
}
});
}
async disconnect(): Promise<void> {
if (this.unlisten) {
this.unlisten();
this.unlisten = null;
}
this.handlers.clear();
}
subscribe(sessionId: string, handler: StreamHandler): () => void {
this.currentSessionId = sessionId;
if (!this.handlers.has(sessionId)) {
this.handlers.set(sessionId, []);
}
this.handlers.get(sessionId)!.push(handler);
// Return unsubscribe function
return () => {
const handlers = this.handlers.get(sessionId);
if (handlers) {
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
}
};
}
clearSession(sessionId: string): void {
this.handlers.delete(sessionId);
if (this.currentSessionId === sessionId) {
this.currentSessionId = null;
}
}
}
// Singleton instance
let streamingClient: StreamingClient | null = null;
export async function getStreamingClient(): Promise<StreamingClient> {
if (!streamingClient) {
streamingClient = new StreamingClient();
await streamingClient.connect();
}
return streamingClient;
}
```
- [ ] **Step 2: 验证 TypeScript 编译**
Run: `cd desktop && pnpm tsc --noEmit`
Expected: 无类型错误
- [ ] **Step 3: 提交流式客户端**
```bash
git add desktop/src/lib/streaming-client.ts
git commit -m "feat(frontend): add streaming client for Tauri events
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
```
---
### Task 4.2: 集成到 ChatStore
**Files:**
- Modify: `desktop/src/store/chatStore.ts`
- [ ] **Step 1: 添加流式状态和处理**
```typescript
import { getStreamingClient, StreamChunk } from '../lib/streaming-client';
import { invoke } from '@tauri-apps/api/core';
interface ChatState {
// ... existing state
streamingMessage: string;
isStreaming: boolean;
streamError: string | null;
}
// In store actions:
async sendStreamingMessage(content: string) {
const sessionId = this.currentSessionId;
const agentId = this.currentAgentId;
if (!sessionId || !agentId) return;
// Add user message immediately
this.addMessage({
role: 'user',
content,
});
// Prepare for streaming
this.streamingMessage = '';
this.isStreaming = true;
this.streamError = null;
// Create placeholder assistant message
const assistantMessageId = this.addMessage({
role: 'assistant',
content: '',
});
try {
const client = await getStreamingClient();
const unsubscribe = client.subscribe(sessionId, (chunk: StreamChunk) => {
switch (chunk.type) {
case 'text_delta':
this.streamingMessage += chunk.delta || '';
this.updateMessage(assistantMessageId, {
content: this.streamingMessage,
});
break;
case 'thinking_delta':
// Handle thinking if needed
break;
case 'tool_use_start':
this.addMessage({
role: 'tool_use',
content: '',
toolName: chunk.name,
toolId: chunk.id,
});
break;
case 'complete':
this.isStreaming = false;
unsubscribe();
break;
case 'error':
this.streamError = chunk.message || 'Unknown error';
this.isStreaming = false;
unsubscribe();
break;
}
});
// Invoke the streaming command
await invoke('chat_stream', {
agentId,
sessionId,
message: content,
});
} catch (error) {
this.streamError = String(error);
this.isStreaming = false;
}
}
```
- [ ] **Step 2: 验证 TypeScript 编译**
Run: `cd desktop && pnpm tsc --noEmit`
Expected: 无类型错误
- [ ] **Step 3: 提交 ChatStore 流式集成**
```bash
git add desktop/src/store/chatStore.ts
git commit -m "feat(frontend): integrate streaming into ChatStore
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
```
---
### Task 4.3: 更新聊天 UI 组件
**Files:**
- Modify: `desktop/src/components/ChatPanel.tsx` (或对应组件)
- [ ] **Step 1: 添加流式发送逻辑**
在发送消息的处理中添加流式判断:
```tsx
const handleSend = async (content: string) => {
if (useStreaming) {
await chatStore.sendStreamingMessage(content);
} else {
await chatStore.sendMessage(content);
}
};
```
- [ ] **Step 2: 添加流式指示器**
```tsx
{chatStore.isStreaming && (
<div className="flex items-center gap-2 text-sm text-gray-500">
<span className="animate-pulse"></span>
正在生成...
</div>
)}
```
- [ ] **Step 3: 验证编译通过**
Run: `cd desktop && pnpm tsc --noEmit`
Expected: 无类型错误
- [ ] **Step 4: 提交 UI 更新**
```bash
git add desktop/src/components/ChatPanel.tsx
git commit -m "feat(frontend): add streaming UI indicators
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
```
---
## 里程碑 M1: 流式响应完成
- [ ] **验证流式响应功能**
Run: `pnpm desktop` 启动应用
Expected:
1. 发送消息后能看到逐字显示
2. 流式过程中有指示器
3. 完成后状态正确
- [ ] **提交里程碑**
```bash
git tag v0.2.0-m1-streaming
git push origin v0.2.0-m1-streaming
```
---
## Chunk 5: MCP 协议 - 传输层和类型定义
### Task 5.1: 创建 MCP JSON-RPC 类型
**Files:**
- Create: `crates/zclaw-protocols/src/mcp_types.rs`
- [ ] **Step 1: 创建 MCP JSON-RPC 类型文件**
```rust
//! MCP JSON-RPC 2.0 types
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// JSON-RPC 2.0 Request
#[derive(Debug, Clone, Serialize)]
pub struct JsonRpcRequest {
pub jsonrpc: &'static str,
pub id: u64,
pub method: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub params: Option<serde_json::Value>,
}
impl JsonRpcRequest {
pub fn new(id: u64, method: impl Into<String>) -> Self {
Self {
jsonrpc: "2.0",
id,
method: method.into(),
params: None,
}
}
pub fn with_params(mut self, params: serde_json::Value) -> Self {
self.params = Some(params);
self
}
}
/// JSON-RPC 2.0 Response
#[derive(Debug, Clone, Deserialize)]
pub struct JsonRpcResponse {
pub jsonrpc: String,
pub id: u64,
#[serde(default)]
pub result: Option<serde_json::Value>,
#[serde(default)]
pub error: Option<JsonRpcError>,
}
/// JSON-RPC Error
#[derive(Debug, Clone, Deserialize)]
pub struct JsonRpcError {
pub code: i32,
pub message: String,
#[serde(default)]
pub data: Option<serde_json::Value>,
}
/// MCP Initialize Request
#[derive(Debug, Clone, Serialize)]
pub struct InitializeRequest {
pub protocol_version: String,
pub capabilities: ClientCapabilities,
pub client_info: Implementation,
}
impl Default for InitializeRequest {
fn default() -> Self {
Self {
protocol_version: "2024-11-05".to_string(),
capabilities: ClientCapabilities::default(),
client_info: Implementation {
name: "zclaw".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
},
}
}
}
/// Client capabilities
#[derive(Debug, Clone, Serialize, Default)]
pub struct ClientCapabilities {
#[serde(skip_serializing_if = "Option::is_none")]
pub roots: Option<RootsCapability>,
#[serde(skip_serializing_if = "Option::is_none")]
pub sampling: Option<SamplingCapability>,
}
#[derive(Debug, Clone, Serialize)]
pub struct RootsCapability {
#[serde(default)]
pub list_changed: bool,
}
#[derive(Debug, Clone, Serialize, Default)]
pub struct SamplingCapability {}
/// Server capabilities (from initialize response)
#[derive(Debug, Clone, Deserialize, Default)]
pub struct ServerCapabilities {
#[serde(default)]
pub tools: Option<ToolsCapability>,
#[serde(default)]
pub resources: Option<ResourcesCapability>,
#[serde(default)]
pub prompts: Option<PromptsCapability>,
#[serde(default)]
pub logging: Option<LoggingCapability>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ToolsCapability {
#[serde(default)]
pub list_changed: bool,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ResourcesCapability {
#[serde(default)]
pub subscribe: bool,
#[serde(default)]
pub list_changed: bool,
}
#[derive(Debug, Clone, Deserialize)]
pub struct PromptsCapability {
#[serde(default)]
pub list_changed: bool,
}
#[derive(Debug, Clone, Deserialize)]
pub struct LoggingCapability {}
/// Implementation info
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Implementation {
pub name: String,
pub version: String,
}
/// Initialize result
#[derive(Debug, Clone, Deserialize)]
pub struct InitializeResult {
pub protocol_version: String,
pub capabilities: ServerCapabilities,
pub server_info: Implementation,
}
/// Tool from tools/list
#[derive(Debug, Clone, Deserialize)]
pub struct Tool {
pub name: String,
#[serde(default)]
pub description: Option<String>,
pub input_schema: serde_json::Value,
}
/// Resource from resources/list
#[derive(Debug, Clone, Deserialize)]
pub struct Resource {
pub uri: String,
pub name: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub mime_type: Option<String>,
}
/// Prompt from prompts/list
#[derive(Debug, Clone, Deserialize)]
pub struct Prompt {
pub name: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub arguments: Vec<PromptArgument>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct PromptArgument {
pub name: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub required: bool,
}
/// Tool call result
#[derive(Debug, Clone, Deserialize)]
pub struct CallToolResult {
pub content: Vec<ContentBlock>,
#[serde(default)]
pub is_error: bool,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ContentBlock {
Text { text: String },
Image { data: String, mime_type: String },
Resource { resource: ResourceContent },
}
#[derive(Debug, Clone, Deserialize)]
pub struct ResourceContent {
pub uri: String,
#[serde(default)]
pub mime_type: Option<String>,
#[serde(default)]
pub text: Option<String>,
#[serde(default)]
pub blob: Option<String>,
}
/// Read resource result
#[derive(Debug, Clone, Deserialize)]
pub struct ReadResourceResult {
pub contents: Vec<ResourceContent>,
}
/// Get prompt result
#[derive(Debug, Clone, Deserialize)]
pub struct GetPromptResult {
#[serde(default)]
pub description: Option<String>,
pub messages: Vec<PromptMessage>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct PromptMessage {
pub role: String,
pub content: ContentBlock,
}
```
- [ ] **Step 2: 验证编译通过**
Run: `cd crates/zclaw-protocols && cargo check`
Expected: 编译成功
- [ ] **Step 3: 提交 MCP 类型定义**
```bash
git add crates/zclaw-protocols/src/mcp_types.rs
git commit -m "feat(protocols): add MCP JSON-RPC type definitions
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
```
---
### Task 5.2: 创建 MCP 传输层
**Files:**
- Create: `crates/zclaw-protocols/src/mcp_transport.rs`
- [ ] **Step 1: 创建传输层 trait 和实现**
```rust
//! MCP Transport layer - stdio and HTTP
use async_trait::async_trait;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use serde_json::Value;
use std::io;
use std::process::Stdio;
use zclaw_types::Result;
use zclaw_types::ZclawError;
use super::mcp_types::{JsonRpcRequest, JsonRpcResponse};
/// MCP Transport trait
#[async_trait]
pub trait McpTransport: Send + Sync {
/// Send a request and receive response
async fn request(&mut self, req: JsonRpcRequest) -> Result<JsonRpcResponse>;
/// Send a notification (no response expected)
async fn notify(&mut self, method: &str, params: Option<Value>) -> Result<()>;
/// Close the transport
async fn close(&mut self) -> Result<()>;
}
/// Stdio transport for local MCP servers
pub struct StdioTransport {
process: Child,
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
request_id: u64,
}
impl StdioTransport {
/// Start a new MCP server process and connect via stdio
pub async fn start(command: &str, args: &[&str]) -> Result<Self> {
let mut cmd = Command::new(command);
cmd.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::null());
let mut process = cmd.spawn()
.map_err(|e| ZclawError::McpError(format!("Failed to start MCP server: {}", e)))?;
let stdin = process.stdin.take()
.ok_or_else(|| ZclawError::McpError("Failed to open stdin".to_string()))?;
let stdout = process.stdout.take()
.ok_or_else(|| ZclawError::McpError("Failed to open stdout".to_string()))?;
Ok(Self {
process,
stdin,
stdout: BufReader::new(stdout),
request_id: 0,
})
}
fn next_request_id(&mut self) -> u64 {
self.request_id += 1;
self.request_id
}
}
#[async_trait]
impl McpTransport for StdioTransport {
async fn request(&mut self, req: JsonRpcRequest) -> Result<JsonRpcResponse> {
// Serialize and send request
let json = serde_json::to_string(&req)
.map_err(|e| ZclawError::McpError(format!("JSON serialize error: {}", e)))?;
let line = format!("Content-Length: {}\r\n\r\n{}", json.len(), json);
self.stdin.write_all(line.as_bytes()).await
.map_err(|e| ZclawError::McpError(format!("Write error: {}", e)))?;
self.stdin.flush().await
.map_err(|e| ZclawError::McpError(format!("Flush error: {}", e)))?;
// Read response
// Read Content-Length header
let mut header_line = String::new();
self.stdout.read_line(&mut header_line).await
.map_err(|e| ZclawError::McpError(format!("Read header error: {}", e)))?;
let content_length: usize = header_line
.strip_prefix("Content-Length: ")
.and_then(|s| s.trim().parse().ok())
.ok_or_else(|| ZclawError::McpError("Invalid Content-Length header".to_string()))?;
// Read empty line
let mut empty_line = String::new();
self.stdout.read_line(&mut empty_line).await
.map_err(|e| ZclawError::McpError(format!("Read separator error: {}", e)))?;
// Read body
let mut body = vec![0u8; content_length];
use tokio::io::AsyncReadExt;
self.stdout.read_exact(&mut body).await
.map_err(|e| ZclawError::McpError(format!("Read body error: {}", e)))?;
let body_str = String::from_utf8(body)
.map_err(|e| ZclawError::McpError(format!("UTF-8 decode error: {}", e)))?;
let response: JsonRpcResponse = serde_json::from_str(&body_str)
.map_err(|e| ZclawError::McpError(format!("JSON parse error: {}", e)))?;
Ok(response)
}
async fn notify(&mut self, method: &str, params: Option<Value>) -> Result<()> {
let id = self.next_request_id();
let req = JsonRpcRequest::new(id, method).with_params(params.unwrap_or(Value::Null));
let json = serde_json::to_string(&req)
.map_err(|e| ZclawError::McpError(format!("JSON serialize error: {}", e)))?;
let line = format!("Content-Length: {}\r\n\r\n{}", json.len(), json);
self.stdin.write_all(line.as_bytes()).await
.map_err(|e| ZclawError::McpError(format!("Write error: {}", e)))?;
self.stdin.flush().await
.map_err(|e| ZclawError::McpError(format!("Flush error: {}", e)))?;
Ok(())
}
async fn close(&mut self) -> Result<()> {
let _ = self.process.kill().await;
Ok(())
}
}
/// HTTP transport for remote MCP servers
pub struct HttpTransport {
base_url: String,
client: reqwest::Client,
request_id: u64,
}
impl HttpTransport {
pub fn new(base_url: impl Into<String>) -> Self {
Self {
base_url: base_url.into(),
client: reqwest::Client::new(),
request_id: 0,
}
}
fn next_request_id(&mut self) -> u64 {
self.request_id += 1;
self.request_id
}
}
#[async_trait]
impl McpTransport for HttpTransport {
async fn request(&mut self, req: JsonRpcRequest) -> Result<JsonRpcResponse> {
let response = self.client
.post(&self.base_url)
.json(&req)
.send()
.await
.map_err(|e| ZclawError::McpError(format!("HTTP request error: {}", e)))?;
if !response.status().is_success() {
return Err(ZclawError::McpError(format!("HTTP error: {}", response.status())));
}
let jsonrpc_response: JsonRpcResponse = response.json().await
.map_err(|e| ZclawError::McpError(format!("JSON parse error: {}", e)))?;
Ok(jsonrpc_response)
}
async fn notify(&mut self, _method: &str, _params: Option<Value>) -> Result<()> {
// HTTP doesn't support notifications in the same way
// Could implement as fire-and-forget POST
Ok(())
}
async fn close(&mut self) -> Result<()> {
Ok(())
}
}
```
- [ ] **Step 2: 在 ZclawError 中添加 McpError 变体**
检查 `crates/zclaw-types/src/error.rs`,确保有:
```rust
pub enum ZclawError {
// ... existing variants
McpError(String),
}
```
- [ ] **Step 3: 验证编译通过**
Run: `cd crates/zclaw-protocols && cargo check`
Expected: 编译成功
- [ ] **Step 4: 提交传输层实现**
```bash
git add crates/zclaw-protocols/src/mcp_transport.rs crates/zclaw-types/src/error.rs
git commit -m "feat(protocols): add MCP transport layer (stdio/HTTP)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
```
---
## Chunk 6: MCP 协议 - 客户端实现
### Task 6.1: 实现完整 MCP 客户端
**Files:**
- Create: `crates/zclaw-protocols/src/mcp_client.rs`
- Modify: `crates/zclaw-protocols/src/mcp.rs`
- Modify: `crates/zclaw-protocols/src/lib.rs`
- [ ] **Step 1: 创建 MCP 客户端**
```rust
//! MCP Client implementation
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use zclaw_types::{Result, ZclawError};
use super::mcp_transport::McpTransport;
use super::mcp_types::*;
use super::{McpClient, McpTool, McpResource, McpPrompt, McpToolCallRequest, McpToolCallResponse, McpResourceContent};
/// MCP Client configuration
#[derive(Debug, Clone)]
pub struct McpClientConfig {
pub name: String,
pub transport_type: McpTransportType,
}
#[derive(Debug, Clone)]
pub enum McpTransportType {
Stdio { command: String, args: Vec<String> },
Http { url: String },
}
/// Full MCP client implementation
pub struct FullMcpClient {
config: McpClientConfig,
transport: Arc<RwLock<Box<dyn McpTransport>>>,
server_capabilities: RwLock<Option<ServerCapabilities>>,
server_info: RwLock<Option<Implementation>>,
initialized: RwLock<bool>,
}
impl FullMcpClient {
/// Create and initialize a new MCP client
pub async fn connect(config: McpClientConfig) -> Result<Self> {
let transport: Box<dyn McpTransport> = match &config.transport_type {
McpTransportType::Stdio { command, args } => {
let args_ref: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
Box::new(
super::mcp_transport::StdioTransport::start(command, &args_ref).await?
)
}
McpTransportType::Http { url } => {
Box::new(super::mcp_transport::HttpTransport::new(url))
}
};
let client = Self {
config,
transport: Arc::new(RwLock::new(transport)),
server_capabilities: RwLock::new(None),
server_info: RwLock::new(None),
initialized: RwLock::new(false),
};
// Initialize connection
client.initialize().await?;
Ok(client)
}
/// Send initialize request
async fn initialize(&self) -> Result<()> {
let init_request = InitializeRequest::default();
let request = JsonRpcRequest::new(1, "initialize")
.with_params(serde_json::to_value(&init_request)
.map_err(|e| ZclawError::McpError(format!("Serialize error: {}", e)))?);
let mut transport = self.transport.write().await;
let response = transport.request(request).await?;
if let Some(error) = response.error {
return Err(ZclawError::McpError(format!("Initialize error: {}", error.message)));
}
let result: InitializeResult = response.result
.ok_or_else(|| ZclawError::McpError("No result in initialize response".to_string()))?
.deserialize()
.map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;
// Store server info
*self.server_capabilities.write().await = Some(result.capabilities);
*self.server_info.write().await = Some(result.server_info);
// Send initialized notification
transport.notify("notifications/initialized", None).await?;
*self.initialized.write().await = true;
Ok(())
}
/// Check if server supports tools
pub async fn supports_tools(&self) -> bool {
self.server_capabilities.read().await
.as_ref()
.map(|c| c.tools.is_some())
.unwrap_or(false)
}
/// Check if server supports resources
pub async fn supports_resources(&self) -> bool {
self.server_capabilities.read().await
.as_ref()
.map(|c| c.resources.is_some())
.unwrap_or(false)
}
/// Check if server supports prompts
pub async fn supports_prompts(&self) -> bool {
self.server_capabilities.read().await
.as_ref()
.map(|c| c.prompts.is_some())
.unwrap_or(false)
}
}
#[async_trait]
impl McpClient for FullMcpClient {
async fn list_tools(&self) -> Result<Vec<McpTool>> {
if !self.supports_tools().await {
return Ok(Vec::new());
}
let request = JsonRpcRequest::new(2, "tools/list", None);
let response = self.transport.write().await.request(request).await?;
if let Some(error) = response.error {
return Err(ZclawError::McpError(format!("List tools error: {}", error.message)));
}
#[derive(Deserialize)]
struct ToolsListResult {
tools: Vec<Tool>,
}
let result: ToolsListResult = response.result
.ok_or_else(|| ZclawError::McpError("No result".to_string()))?
.deserialize()
.map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;
Ok(result.tools.into_iter().map(|t| McpTool {
name: t.name,
description: t.description.unwrap_or_default(),
input_schema: t.input_schema,
}).collect())
}
async fn call_tool(&self, request: McpToolCallRequest) -> Result<McpToolCallResponse> {
let params = serde_json::json!({
"name": request.name,
"arguments": request.arguments,
});
let rpc_request = JsonRpcRequest::new(3, "tools/call").with_params(params);
let response = self.transport.write().await.request(rpc_request).await?;
if let Some(error) = response.error {
return Err(ZclawError::McpError(format!("Call tool error: {}", error.message)));
}
let result: CallToolResult = response.result
.ok_or_else(|| ZclawError::McpError("No result".to_string()))?
.deserialize()
.map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;
Ok(McpToolCallResponse {
content: result.content.into_iter().map(|c| match c {
ContentBlock::Text { text } => super::McpContent::Text { text },
ContentBlock::Image { data, mime_type } => super::McpContent::Image { data, mime_type },
ContentBlock::Resource { resource } => super::McpContent::Resource {
resource: McpResourceContent {
uri: resource.uri,
mime_type: resource.mime_type,
text: resource.text,
blob: resource.blob,
},
},
}).collect(),
is_error: result.is_error,
})
}
async fn list_resources(&self) -> Result<Vec<McpResource>> {
if !self.supports_resources().await {
return Ok(Vec::new());
}
let request = JsonRpcRequest::new(4, "resources/list", None);
let response = self.transport.write().await.request(request).await?;
if let Some(error) = response.error {
return Err(ZclawError::McpError(format!("List resources error: {}", error.message)));
}
#[derive(Deserialize)]
struct ResourcesListResult {
resources: Vec<Resource>,
}
let result: ResourcesListResult = response.result
.ok_or_else(|| ZclawError::McpError("No result".to_string()))?
.deserialize()
.map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;
Ok(result.resources.into_iter().map(|r| McpResource {
uri: r.uri,
name: r.name,
description: r.description,
mime_type: r.mime_type,
}).collect())
}
async fn read_resource(&self, uri: &str) -> Result<McpResourceContent> {
let params = serde_json::json!({
"uri": uri,
});
let request = JsonRpcRequest::new(5, "resources/read").with_params(params);
let response = self.transport.write().await.request(request).await?;
if let Some(error) = response.error {
return Err(ZclawError::McpError(format!("Read resource error: {}", error.message)));
}
let result: ReadResourceResult = response.result
.ok_or_else(|| ZclawError::McpError("No result".to_string()))?
.deserialize()
.map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;
let content = result.contents.into_iter().next()
.ok_or_else(|| ZclawError::McpError("No content".to_string()))?;
Ok(McpResourceContent {
uri: content.uri,
mime_type: content.mime_type,
text: content.text,
blob: content.blob,
})
}
async fn list_prompts(&self) -> Result<Vec<McpPrompt>> {
if !self.supports_prompts().await {
return Ok(Vec::new());
}
let request = JsonRpcRequest::new(6, "prompts/list", None);
let response = self.transport.write().await.request(request).await?;
if let Some(error) = response.error {
return Err(ZclawError::McpError(format!("List prompts error: {}", error.message)));
}
#[derive(Deserialize)]
struct PromptsListResult {
prompts: Vec<Prompt>,
}
let result: PromptsListResult = response.result
.ok_or_else(|| ZclawError::McpError("No result".to_string()))?
.deserialize()
.map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;
Ok(result.prompts.into_iter().map(|p| McpPrompt {
name: p.name,
description: p.description.unwrap_or_default(),
arguments: p.arguments.into_iter().map(|a| super::McpPromptArgument {
name: a.name,
description: a.description.unwrap_or_default(),
required: a.required,
}).collect(),
}).collect())
}
async fn get_prompt(&self, name: &str, arguments: HashMap<String, String>) -> Result<String> {
let params = serde_json::json!({
"name": name,
"arguments": arguments,
});
let request = JsonRpcRequest::new(7, "prompts/get").with_params(params);
let response = self.transport.write().await.request(request).await?;
if let Some(error) = response.error {
return Err(ZclawError::McpError(format!("Get prompt error: {}", error.message)));
}
let result: GetPromptResult = response.result
.ok_or_else(|| ZclawError::McpError("No result".to_string()))?
.deserialize()
.map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;
// Convert messages to string
let prompt_text: String = result.messages.into_iter()
.filter_map(|m| match m.content {
ContentBlock::Text { text } => Some(format!("{}: {}", m.role, text)),
_ => None,
})
.collect::<Vec<_>>()
.join("\n");
Ok(prompt_text)
}
}
```
- [ ] **Step 2: 更新 lib.rs 导出**
```rust
// crates/zclaw-protocols/src/lib.rs
pub mod mcp;
pub mod mcp_types;
pub mod mcp_transport;
pub mod mcp_client;
pub mod a2a;
pub use mcp::*;
pub use mcp_client::{FullMcpClient, McpClientConfig, McpTransportType};
```
- [ ] **Step 3: 验证编译通过**
Run: `cd crates/zclaw-protocols && cargo check`
Expected: 编译成功
- [ ] **Step 4: 提交 MCP 客户端实现**
```bash
git add crates/zclaw-protocols/src/mcp_client.rs crates/zclaw-protocols/src/lib.rs
git commit -m "feat(protocols): implement full MCP client with tools/resources/prompts
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
```
---
## 里程碑 M2: MCP 协议完成
- [ ] **验证 MCP 功能**
创建测试脚本连接 filesystem-mcp:
```bash
# 安装 filesystem-mcp
npm install -g @modelcontextprotocol/server-filesystem
# 运行测试
cargo test --package zclaw-protocols --test mcp_test
```
Expected: 成功连接并列出文件
- [ ] **提交里程碑**
```bash
git tag v0.2.0-m2-mcp
git push origin v0.2.0-m2-mcp
```
---
## Chunk 7: Browser Hand 实现
### Task 7.1: 添加 playwright 依赖
**Files:**
- Modify: `crates/zclaw-hands/Cargo.toml`
- [ ] **Step 1: 添加 playwright 依赖**
```toml
[dependencies]
# ... existing dependencies
chromiumoxide = { version = "0.7", features = ["tokio-runtime"] }
tokio-stream = "0.1"
```
注意: 使用 `chromiumoxide` 作为 playwright-rust 的替代品,因为它的 API 更稳定。
- [ ] **Step 2: 验证依赖下载**
Run: `cd crates/zclaw-hands && cargo fetch`
Expected: 依赖下载成功
- [ ] **Step 3: 提交依赖更新**
```bash
git add crates/zclaw-hands/Cargo.toml crates/zclaw-hands/Cargo.lock
git commit -m "chore(hands): add chromiumoxide dependency for Browser Hand
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
```
---
### Task 7.2: 实现 Browser Hand
**Files:**
- Create: `crates/zclaw-hands/src/hands/browser.rs`
- Modify: `crates/zclaw-hands/src/hands/mod.rs`
- [ ] **Step 1: 创建 Browser Hand 实现**
```rust
//! Browser Hand - Browser automation capability
use async_trait::async_trait;
use chromiumoxide::{Browser, BrowserConfig};
use chromiumoxide::cdp::browser_protocol::page::{ScreenshotFormat, ScreenshotParameters};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use zclaw_types::{AgentId, Result, ZclawError};
use crate::hand::{Hand, HandConfig, HandContext, HandResult, HandStatus};
/// Browser Hand - provides browser automation capabilities
pub struct BrowserHand {
config: HandConfig,
browser: Arc<Mutex<Option<Browser>>>,
page_url: Arc<Mutex<Option<String>>>,
}
impl BrowserHand {
pub fn new() -> Self {
Self {
config: HandConfig {
id: "browser".to_string(),
name: "Browser".to_string(),
description: "浏览器自动化能力包 - 自动化网页操作和数据采集".to_string(),
needs_approval: true,
dependencies: vec!["chromium".to_string()],
input_schema: Some(json!({
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["navigate", "click", "input", "screenshot", "wait", "evaluate", "close"]
},
"url": { "type": "string", "description": "URL to navigate to" },
"selector": { "type": "string", "description": "CSS selector" },
"text": { "type": "string", "description": "Text to input" },
"timeout": { "type": "number", "description": "Timeout in milliseconds" },
"script": { "type": "string", "description": "JavaScript to evaluate" },
},
"required": ["action"]
})),
tags: vec!["browser".to_string(), "automation".to_string(), "web-scraping".to_string()],
enabled: true,
},
browser: Arc::new(Mutex::new(None)),
page_url: Arc::new(Mutex::new(None)),
}
}
async fn ensure_browser(&self) -> Result<Browser> {
let mut browser_guard = self.browser.lock().await;
if browser_guard.is_none() {
let config = BrowserConfig::builder()
.window_size(1920, 1080)
.request_timeout(Duration::from_secs(30))
.build()
.map_err(|e| ZclawError::HandError(format!("Browser config error: {}", e)))?;
let browser = Browser::launch(config)
.await
.map_err(|e| ZclawError::HandError(format!("Browser launch error: {}", e)))?;
*browser_guard = Some(browser.clone());
}
Ok(browser_guard.clone().unwrap())
}
async fn navigate(&self, url: &str) -> Result<Value> {
let browser = self.ensure_browser().await?;
let page = browser.new_page("about:blank").await
.map_err(|e| ZclawError::HandError(format!("Page create error: {}", e)))?;
page.goto(url).await
.map_err(|e| ZclawError::HandError(format!("Navigate error: {}", e)))?;
page.wait_for_navigation().await
.map_err(|e| ZclawError::HandError(format!("Wait navigation error: {}", e)))?;
let title = page.get_title().await
.map_err(|e| ZclawError::HandError(format!("Get title error: {}", e)))?;
*self.page_url.lock().await = Some(url.to_string());
Ok(json!({
"success": true,
"url": url,
"title": title.unwrap_or_default()
}))
}
async fn click(&self, selector: &str) -> Result<Value> {
let browser = self.ensure_browser().await?;
let pages = browser.pages().await
.map_err(|e| ZclawError::HandError(format!("Get pages error: {}", e)))?;
let page = pages.first()
.ok_or_else(|| ZclawError::HandError("No active page".to_string()))?;
page.find_element(selector).await
.map_err(|e| ZclawError::HandError(format!("Find element error: {}", e)))?
.click().await
.map_err(|e| ZclawError::HandError(format!("Click error: {}", e)))?;
Ok(json!({
"success": true,
"selector": selector
}))
}
async fn input_text(&self, selector: &str, text: &str) -> Result<Value> {
let browser = self.ensure_browser().await?;
let pages = browser.pages().await
.map_err(|e| ZclawError::HandError(format!("Get pages error: {}", e)))?;
let page = pages.first()
.ok_or_else(|| ZclawError::HandError("No active page".to_string()))?;
page.find_element(selector).await
.map_err(|e| ZclawError::HandError(format!("Find element error: {}", e)))?
.type_str(text).await
.map_err(|e| ZclawError::HandError(format!("Input error: {}", e)))?;
Ok(json!({
"success": true,
"selector": selector,
"input_length": text.len()
}))
}
async fn screenshot(&self) -> Result<Value> {
let browser = self.ensure_browser().await?;
let pages = browser.pages().await
.map_err(|e| ZclawError::HandError(format!("Get pages error: {}", e)))?;
let page = pages.first()
.ok_or_else(|| ZclawError::HandError("No active page".to_string()))?;
let params = ScreenshotParameters::builder()
.format(ScreenshotFormat::Png)
.build();
let screenshot = page.screenshot(params).await
.map_err(|e| ZclawError::HandError(format!("Screenshot error: {}", e)))?;
let base64 = base64::encode(&screenshot);
Ok(json!({
"success": true,
"format": "png",
"data": base64,
"size_bytes": screenshot.len()
}))
}
async fn wait_for(&self, selector: &str, timeout_ms: u64) -> Result<Value> {
let browser = self.ensure_browser().await?;
let pages = browser.pages().await
.map_err(|e| ZclawError::HandError(format!("Get pages error: {}", e)))?;
let page = pages.first()
.ok_or_else(|| ZclawError::HandError("No active page".to_string()))?;
tokio::time::timeout(
Duration::from_millis(timeout_ms),
page.wait_for_element(selector)
).await
.map_err(|_| ZclawError::HandError(format!("Wait timeout for: {}", selector)))?
.map_err(|e| ZclawError::HandError(format!("Wait error: {}", e)))?;
Ok(json!({
"success": true,
"selector": selector
}))
}
async fn evaluate(&self, script: &str) -> Result<Value> {
let browser = self.ensure_browser().await?;
let pages = browser.pages().await
.map_err(|e| ZclawError::HandError(format!("Get pages error: {}", e)))?;
let page = pages.first()
.ok_or_else(|| ZclawError::HandError("No active page".to_string()))?;
let result = page.evaluate(script).await
.map_err(|e| ZclawError::HandError(format!("Evaluate error: {}", e)))?;
Ok(json!({
"success": true,
"result": result
}))
}
async fn close(&self) -> Result<Value> {
let mut browser_guard = self.browser.lock().await;
if let Some(browser) = browser_guard.take() {
browser.close().await
.map_err(|e| ZclawError::HandError(format!("Close error: {}", e)))?;
}
*self.page_url.lock().await = None;
Ok(json!({ "success": true }))
}
}
impl Default for BrowserHand {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Hand for BrowserHand {
fn config(&self) -> &HandConfig {
&self.config
}
async fn execute(&self, _context: &HandContext, input: Value) -> Result<HandResult> {
#[derive(Deserialize)]
struct BrowserInput {
action: String,
url: Option<String>,
selector: Option<String>,
text: Option<String>,
timeout: Option<u64>,
script: Option<String>,
}
let input: BrowserInput = serde_json::from_value(input)
.map_err(|e| ZclawError::HandError(format!("Invalid input: {}", e)))?;
let result = match input.action.as_str() {
"navigate" => {
let url = input.url.ok_or_else(||
ZclawError::HandError("URL required for navigate".to_string()))?;
self.navigate(&url).await?
}
"click" => {
let selector = input.selector.ok_or_else(||
ZclawError::HandError("Selector required for click".to_string()))?;
self.click(&selector).await?
}
"input" => {
let selector = input.selector.ok_or_else(||
ZclawError::HandError("Selector required for input".to_string()))?;
let text = input.text.ok_or_else(||
ZclawError::HandError("Text required for input".to_string()))?;
self.input_text(&selector, &text).await?
}
"screenshot" => self.screenshot().await?,
"wait" => {
let selector = input.selector.ok_or_else(||
ZclawError::HandError("Selector required for wait".to_string()))?;
let timeout = input.timeout.unwrap_or(5000);
self.wait_for(&selector, timeout).await?
}
"evaluate" => {
let script = input.script.ok_or_else(||
ZclawError::HandError("Script required for evaluate".to_string()))?;
self.evaluate(&script).await?
}
"close" => self.close().await?,
_ => {
return Ok(HandResult::error(format!("Unknown action: {}", input.action)));
}
};
Ok(HandResult::success(result))
}
fn is_dependency_available(&self, dep: &str) -> bool {
match dep {
"chromium" => {
// Check if chromium/chrome is available
std::process::Command::new("chromium")
.arg("--version")
.output()
.is_ok() ||
std::process::Command::new("chrome")
.arg("--version")
.output()
.is_ok()
}
_ => true
}
}
}
```
- [ ] **Step 2: 添加 base64 依赖**
`crates/zclaw-hands/Cargo.toml` 添加:
```toml
base64 = "0.22"
```
- [ ] **Step 3: 更新 mod.rs 导出**
```rust
// crates/zclaw-hands/src/hands/mod.rs
//! Hands - Autonomous capabilities
mod whiteboard;
mod slideshow;
mod speech;
mod quiz;
mod browser;
pub use whiteboard::*;
pub use slideshow::*;
pub use speech::*;
pub use quiz::*;
pub use browser::*;
```
- [ ] **Step 4: 在 ZclawError 中添加 HandError 变体**
确保 `crates/zclaw-types/src/error.rs` 有:
```rust
HandError(String),
```
- [ ] **Step 5: 验证编译通过**
Run: `cd crates/zclaw-hands && cargo check`
Expected: 编译成功
- [ ] **Step 6: 提交 Browser Hand 实现**
```bash
git add crates/zclaw-hands/src/hands/browser.rs crates/zclaw-hands/src/hands/mod.rs crates/zclaw-hands/Cargo.toml crates/zclaw-types/src/error.rs
git commit -m "feat(hands): implement Browser Hand with chromiumoxide
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
```
---
## 里程碑 M3: Browser Hand 完成
- [ ] **验证 Browser Hand 功能**
Run: `cd crates/zclaw-hands && cargo test browser_hand -- --nocapture`
Expected: 测试通过
- [ ] **提交里程碑**
```bash
git tag v0.2.0-m3-browser
git push origin v0.2.0-m3-browser
```
---
## Chunk 8: 工具安全 + 发布准备
### Task 8.1: 创建安全配置文件
**Files:**
- Create: `config/security.toml`
- [ ] **Step 1: 创建安全配置**
```toml
# ZCLAW Security Configuration
# Controls which commands and operations are allowed
[shell_exec]
# Enable shell command execution
enabled = true
# Default timeout in seconds
default_timeout = 60
# Maximum output size in bytes
max_output_size = 1048576 # 1MB
# Whitelist of allowed commands
# If whitelist is non-empty, only these commands are allowed
allowed_commands = [
"git",
"npm",
"pnpm",
"node",
"cargo",
"rustc",
"python",
"python3",
"pip",
"ls",
"cat",
"echo",
"mkdir",
"rm",
"cp",
"mv",
"grep",
"find",
"head",
"tail",
"wc",
]
# Blacklist of dangerous commands (always blocked)
blocked_commands = [
"rm -rf /",
"dd",
"mkfs",
"format",
"shutdown",
"reboot",
"init",
"systemctl",
]
[file_read]
enabled = true
# Allowed directory prefixes (empty = allow all)
allowed_paths = []
# Blocked paths (always blocked)
blocked_paths = [
"/etc/shadow",
"/etc/passwd",
"~/.ssh",
"~/.gnupg",
]
[file_write]
enabled = true
# Maximum file size in bytes (10MB)
max_file_size = 10485760
# Blocked paths
blocked_paths = [
"/etc",
"/usr",
"/bin",
"/sbin",
"C:\\Windows",
"C:\\Program Files",
]
[web_fetch]
enabled = true
# Request timeout in seconds
timeout = 30
# Maximum response size in bytes (10MB)
max_response_size = 10485760
# Block internal/private IP ranges (SSRF protection)
block_private_ips = true
# Allowed domains (empty = allow all)
allowed_domains = []
# Blocked domains
blocked_domains = []
```
- [ ] **Step 2: 提交安全配置**
```bash
git add config/security.toml
git commit -m "feat(config): add security configuration file
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
```
---
### Task 8.2: 实现工具安全验证
**Files:**
- Modify: `crates/zclaw-runtime/src/tool/builtin/shell_exec.rs`
- [ ] **Step 1: 实现命令白名单验证**
```rust
//! Shell execution tool with security controls
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::process::Command;
use std::time::{Duration, Instant};
use tokio::time::timeout;
use zclaw_types::{Result, ZclawError};
use super::{BuiltinTool, ToolContext, ToolResult};
/// Security configuration for shell execution
#[derive(Debug, Clone, Deserialize)]
pub struct ShellSecurityConfig {
pub enabled: bool,
pub default_timeout: u64,
pub max_output_size: usize,
pub allowed_commands: Vec<String>,
pub blocked_commands: Vec<String>,
}
impl Default for ShellSecurityConfig {
fn default() -> Self {
Self {
enabled: true,
default_timeout: 60,
max_output_size: 1024 * 1024, // 1MB
allowed_commands: vec![
"git".to_string(), "npm".to_string(), "pnpm".to_string(),
"node".to_string(), "cargo".to_string(), "rustc".to_string(),
"python".to_string(), "python3".to_string(), "pip".to_string(),
"ls".to_string(), "cat".to_string(), "echo".to_string(),
],
blocked_commands: vec![
"rm -rf /".to_string(), "dd".to_string(), "mkfs".to_string(),
"shutdown".to_string(), "reboot".to_string(),
],
}
}
}
impl ShellSecurityConfig {
/// Load from config file
pub fn load() -> Self {
// Try to load from config/security.toml
if let Ok(content) = std::fs::read_to_string("config/security.toml") {
if let Ok(config) = toml::from_str::<SecurityConfigFile>(&content) {
return config.shell_exec;
}
}
Self::default()
}
/// Check if a command is allowed
pub fn is_command_allowed(&self, command: &str) -> Result<()> {
// Parse the base command
let base_cmd = command.split_whitespace().next().unwrap_or("");
// Check blocked commands first
for blocked in &self.blocked_commands {
if command.starts_with(blocked) || command.contains(blocked) {
return Err(ZclawError::SecurityError(
format!("Command blocked: {}", blocked)
));
}
}
// If whitelist is non-empty, check against it
if !self.allowed_commands.is_empty() {
let allowed: HashSet<&str> = self.allowed_commands
.iter()
.map(|s| s.as_str())
.collect();
if !allowed.contains(base_cmd) {
return Err(ZclawError::SecurityError(
format!("Command not in whitelist: {}", base_cmd)
));
}
}
Ok(())
}
}
#[derive(Debug, Deserialize)]
struct SecurityConfigFile {
shell_exec: ShellSecurityConfig,
}
/// Shell execution tool
pub struct ShellExecTool {
config: ShellSecurityConfig,
}
impl ShellExecTool {
pub fn new() -> Self {
Self {
config: ShellSecurityConfig::load(),
}
}
}
impl Default for ShellExecTool {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl BuiltinTool for ShellExecTool {
fn name(&self) -> &str {
"shell_exec"
}
fn description(&self) -> &str {
"Execute shell commands with security controls"
}
fn input_schema(&self) -> serde_json::Value {
serde_json::json!({
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute"
},
"timeout": {
"type": "number",
"description": "Timeout in seconds (default: 60)"
}
},
"required": ["command"]
})
}
async fn execute(&self, _context: &ToolContext, input: serde_json::Value) -> Result<ToolResult> {
#[derive(Deserialize)]
struct Input {
command: String,
#[serde(default = "default_timeout")]
timeout: u64,
}
fn default_timeout() -> u64 { 60 }
let input: Input = serde_json::from_value(input)
.map_err(|e| ZclawError::ToolError(format!("Invalid input: {}", e)))?;
// Security check
self.config.is_command_allowed(&input.command)?;
// Determine timeout
let timeout_secs = input.timeout.min(self.config.default_timeout * 2);
let timeout_duration = Duration::from_secs(timeout_secs);
// Execute command
let start = Instant::now();
let output = timeout(timeout_duration, async {
#[cfg(target_os = "windows")]
let output = Command::new("cmd")
.args(["/C", &input.command])
.output();
#[cfg(not(target_os = "windows"))]
let output = Command::new("sh")
.args(["-c", &input.command])
.output();
output
})
.await
.map_err(|_| ZclawError::ToolError(format!("Command timed out after {}s", timeout_secs)))?
.map_err(|e| ZclawError::ToolError(format!("Failed to execute command: {}", e)))?;
let duration_ms = start.elapsed().as_millis() as u64;
// Truncate output if too large
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
let stdout_truncated = if stdout.len() > self.config.max_output_size {
&stdout[..self.config.max_output_size]
} else {
&stdout
};
Ok(ToolResult {
success: output.status.success(),
output: serde_json::json!({
"stdout": stdout_truncated,
"stderr": stderr,
"exit_code": output.status.code(),
"duration_ms": duration_ms,
}),
error: if output.status.success() {
None
} else {
Some(format!("Exit code: {:?}", output.status.code()))
},
})
}
}
```
- [ ] **Step 2: 添加 SecurityError 到 ZclawError**
`crates/zclaw-types/src/error.rs` 添加:
```rust
SecurityError(String),
```
- [ ] **Step 3: 验证编译通过**
Run: `cd crates/zclaw-runtime && cargo check`
Expected: 编译成功
- [ ] **Step 4: 提交工具安全实现**
```bash
git add crates/zclaw-runtime/src/tool/builtin/shell_exec.rs crates/zclaw-types/src/error.rs
git commit -m "feat(runtime): implement shell command security whitelist
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
```
---
## 里程碑 M4: 工具安全完成
- [ ] **验证安全功能**
Run: `cd crates/zclaw-runtime && cargo test shell_exec -- --nocapture`
Expected: 安全测试通过,恶意命令被拦截
- [ ] **提交里程碑**
```bash
git tag v0.2.0-m4-security
git push origin v0.2.0-m4-security
```
---
## 发布准备清单
### Week 7-8: 最终验收
- [ ] **功能验收**
- [ ] 流式响应:消息逐字显示,延迟 < 500ms
- [ ] MCP 协议连接 filesystem-mcp读取文件成功
- [ ] Browser Hand打开网页点击截图成功
- [ ] 工具安全恶意命令被拦截
- [ ] 核心对话发送消息 收到响应 无崩溃
- [ ] **文档验收**
- [ ] 用户手册更新
- [ ] CHANGELOG 编写
- [ ] README 更新
- [ ] **打包验收**
- [ ] Windows 安装包构建成功
- [ ] 自签名证书配置
- [ ] 安装测试通过
- [ ] **发布验收**
- [ ] GitHub Release 草稿准备
- [ ] 反馈渠道建立 (GitHub Issues)
---
## 最终发布命令
```bash
# 构建发布版本
cd desktop && pnpm tauri build
# 创建标签
git tag -a v0.2.0 -m "ZCLAW v0.2.0 - Streaming, MCP, Browser Hand"
# 推送标签
git push origin v0.2.0
# 上传到 GitHub Releases
gh release create v0.2.0 \
./desktop/src-tauri/target/release/bundle/msi/*.msi \
./desktop/src-tauri/target/release/bundle/nsis/*.exe \
--title "ZCLAW v0.2.0" \
--notes-file CHANGELOG.md
```
---
## 风险应急方案
| 风险 | 应急措施 |
|------|----------|
| 流式响应延期 | 使用 SSE 简化方案 |
| MCP 兼容问题 | 优先支持 filesystem-mcp |
| Browser 依赖问题 | 提示用户安装 Chromium |
| 测试覆盖不足 | 延长 Week 6增加测试时间 |
| 发布后崩溃 | 24h 内发布 hotfix |