78 KiB
ZCLAW v0.2.0 发布实施计划
For agentic workers: REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: 完成 ZCLAW v0.2.0 正式版发布,包含流式响应、MCP 协议、Browser Hand 和工具安全功能。
Architecture: 基于 Tauri 的桌面应用,Rust 后端 + React 前端。流式响应通过 Tauri 事件系统传递;MCP 使用 JSON-RPC 2.0 协议;Browser Hand 基于 playwright-rust。
Tech Stack: Rust, Tauri 2.x, React 19, TypeScript, playwright-rust, reqwest, tokio
Spec Document: docs/superpowers/specs/2026-03-24-v0.2.0-release-plan-design.md
File Structure
新增文件
crates/zclaw-runtime/
├── src/stream.rs # 流式响应类型定义 (修改)
├── src/driver/streaming.rs # 新增 - 流式驱动实现
crates/zclaw-protocols/
├── src/mcp_client.rs # 新增 - MCP 客户端实现
├── src/mcp_types.rs # 新增 - MCP JSON-RPC 类型
├── src/mcp_transport.rs # 新增 - MCP 传输层 (stdio/HTTP/WS)
crates/zclaw-hands/
├── src/hands/browser.rs # 新增 - Browser Hand 实现
config/
├── security.toml # 新增 - 安全配置
desktop/src/
├── lib/streaming-client.ts # 新增 - 流式客户端
修改文件
crates/zclaw-runtime/src/driver/mod.rs # LlmDriver trait 添加 stream()
crates/zclaw-runtime/src/driver/anthropic.rs # Anthropic 流式实现
crates/zclaw-runtime/src/driver/openai.rs # OpenAI 流式实现
crates/zclaw-runtime/src/loop_runner.rs # 流式循环实现
crates/zclaw-protocols/src/mcp.rs # 完整 MCP 实现
crates/zclaw-protocols/src/lib.rs # 导出新模块
crates/zclaw-hands/src/hands/mod.rs # 导出 Browser Hand
crates/zclaw-hands/Cargo.toml # 添加 playwright 依赖
desktop/src/store/chatStore.ts # 流式事件处理
desktop/src-tauri/src/lib.rs # Tauri 流式命令
Chunk 1: 流式响应 - 后端 Trait 和类型
Task 1.1: 定义流式响应类型
Files:
-
Modify:
crates/zclaw-runtime/src/stream.rs -
Step 1: 添加流式响应类型定义
在 stream.rs 中添加:
//! Streaming response types
use serde::{Deserialize, Serialize};
/// Stream chunk emitted during streaming
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum StreamChunk {
/// Text delta
TextDelta { delta: String },
/// Thinking delta (for extended thinking models)
ThinkingDelta { delta: String },
/// Tool use started
ToolUseStart { id: String, name: String },
/// Tool use input delta
ToolUseDelta { id: String, delta: String },
/// Tool use completed
ToolUseEnd { id: String, input: serde_json::Value },
/// Stream completed
Complete {
input_tokens: u32,
output_tokens: u32,
stop_reason: String,
},
/// Error occurred
Error { message: String },
}
/// Streaming event for Tauri emission
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StreamEvent {
/// Session ID for routing
pub session_id: String,
/// Agent ID for routing
pub agent_id: String,
/// The chunk content
pub chunk: StreamChunk,
}
impl StreamEvent {
pub fn new(session_id: impl Into<String>, agent_id: impl Into<String>, chunk: StreamChunk) -> Self {
Self {
session_id: session_id.into(),
agent_id: agent_id.into(),
chunk,
}
}
}
- Step 2: 验证编译通过
Run: cd crates/zclaw-runtime && cargo check
Expected: 编译成功,无错误
- Step 3: 提交类型定义
git add crates/zclaw-runtime/src/stream.rs
git commit -m "feat(runtime): add streaming response types
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
Task 1.2: 扩展 LlmDriver Trait
Files:
-
Modify:
crates/zclaw-runtime/src/driver/mod.rs -
Step 1: 添加 stream 方法到 trait
在 LlmDriver trait 中添加方法:
use futures::Stream;
use std::pin::Pin;
use crate::stream::StreamChunk;
/// LLM Driver trait - unified interface for all providers
#[async_trait]
pub trait LlmDriver: Send + Sync {
/// Get the provider name
fn provider(&self) -> &str;
/// Send a completion request
async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse>;
/// Send a streaming completion request
/// Returns a stream of chunks
fn stream(
&self,
request: CompletionRequest,
) -> Pin<Box<dyn Stream<Item = Result<StreamChunk>> + Send + '_>>;
/// Check if the driver is properly configured
fn is_configured(&self) -> bool;
}
- Step 2: 在 Cargo.toml 添加 futures 依赖
检查 crates/zclaw-runtime/Cargo.toml 是否有 futures 依赖,如果没有则添加:
futures = "0.3"
- Step 3: 验证编译 (预期失败)
Run: cd crates/zclaw-runtime && cargo check 2>&1
Expected: 编译失败,因为各驱动未实现 stream() 方法
- Step 4: 提交 trait 扩展
git add crates/zclaw-runtime/src/driver/mod.rs crates/zclaw-runtime/Cargo.toml
git commit -m "feat(runtime): add stream() method to LlmDriver trait
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
Chunk 2: 流式响应 - Anthropic 驱动实现
Task 2.1: 实现 Anthropic 流式 API
Files:
-
Modify:
crates/zclaw-runtime/src/driver/anthropic.rs -
Step 1: 添加流式请求类型
在 anthropic.rs 中添加 SSE 解析类型:
use futures::{Stream, StreamExt};
use crate::stream::StreamChunk;
use std::pin::Pin;
/// SSE event from Anthropic API
#[derive(Debug, Deserialize)]
struct AnthropicStreamEvent {
#[serde(rename = "type")]
event_type: String,
#[serde(default)]
index: Option<u32>,
#[serde(default)]
delta: Option<AnthropicDelta>,
#[serde(default)]
content_block: Option<AnthropicContentBlock>,
#[serde(default)]
message: Option<AnthropicStreamMessage>,
}
#[derive(Debug, Deserialize)]
struct AnthropicDelta {
#[serde(default)]
text: Option<String>,
#[serde(default)]
thinking: Option<String>,
#[serde(default)]
partial_json: Option<String>,
}
#[derive(Debug, Deserialize)]
struct AnthropicStreamMessage {
#[serde(default)]
stop_reason: Option<String>,
#[serde(default)]
usage: Option<AnthropicUsage>,
}
- Step 2: 实现 stream 方法
fn stream(
&self,
request: CompletionRequest,
) -> Pin<Box<dyn Stream<Item = Result<StreamChunk>> + Send + '_>> {
Box::pin(async_stream::stream! {
let mut stream_request = self.build_api_request(&request);
stream_request.stream = true;
let response = match self.client
.post(format!("{}/v1/messages", self.base_url))
.header("x-api-key", self.api_key.expose_secret())
.header("anthropic-version", "2023-06-01")
.header("content-type", "application/json")
.json(&stream_request)
.send()
.await
{
Ok(r) => r,
Err(e) => {
yield Err(ZclawError::LlmError(format!("HTTP request failed: {}", e)));
return;
}
};
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
yield Err(ZclawError::LlmError(format!("API error {}: {}", status, body)));
return;
}
let mut byte_stream = response.bytes_stream();
let mut current_tool_id: Option<String> = None;
let mut current_tool_name: Option<String> = None;
let mut tool_input_buffer = String::new();
while let Some(chunk_result) = byte_stream.next().await {
let chunk = match chunk_result {
Ok(c) => c,
Err(e) => {
yield Err(ZclawError::LlmError(format!("Stream error: {}", e)));
continue;
}
};
let text = String::from_utf8_lossy(&chunk);
for line in text.lines() {
if let Some(data) = line.strip_prefix("data: ") {
if data == "[DONE]" {
continue;
}
match serde_json::from_str::<AnthropicStreamEvent>(data) {
Ok(event) => {
match event.event_type.as_str() {
"content_block_delta" => {
if let Some(delta) = event.delta {
if let Some(text) = delta.text {
yield Ok(StreamChunk::TextDelta { delta: text });
}
if let Some(thinking) = delta.thinking {
yield Ok(StreamChunk::ThinkingDelta { delta: thinking });
}
if let Some(json) = delta.partial_json {
tool_input_buffer.push_str(&json);
}
}
}
"content_block_start" => {
if let Some(block) = event.content_block {
match block.block_type.as_str() {
"tool_use" => {
current_tool_id = block.id.clone();
current_tool_name = Some(block.name.clone().unwrap_or_default());
tool_input_buffer.clear();
yield Ok(StreamChunk::ToolUseStart {
id: block.id.unwrap_or_default(),
name: block.name.unwrap_or_default(),
});
}
_ => {}
}
}
}
"content_block_stop" => {
if current_tool_id.is_some() {
let input: serde_json::Value = serde_json::from_str(&tool_input_buffer)
.unwrap_or(serde_json::Value::Object(Default::default()));
yield Ok(StreamChunk::ToolUseEnd {
id: current_tool_id.take().unwrap_or_default(),
input,
});
tool_input_buffer.clear();
}
}
"message_delta" => {
if let Some(msg) = event.message {
if msg.stop_reason.is_some() {
yield Ok(StreamChunk::Complete {
input_tokens: msg.usage.as_ref().map(|u| u.input_tokens).unwrap_or(0),
output_tokens: msg.usage.as_ref().map(|u| u.output_tokens).unwrap_or(0),
stop_reason: msg.stop_reason.unwrap_or_else(|| "end_turn".to_string()),
});
}
}
}
"error" => {
yield Ok(StreamChunk::Error {
message: "Stream error".to_string(),
});
}
_ => {}
}
}
Err(e) => {
// Log but don't yield error for parse failures
tracing::warn!("Failed to parse SSE event: {} - {}", e, data);
}
}
}
}
}
})
}
- Step 3: 添加 async-stream 依赖
在 crates/zclaw-runtime/Cargo.toml 添加:
async-stream = "0.3"
tracing = "0.1"
- Step 4: 验证编译通过
Run: cd crates/zclaw-runtime && cargo check
Expected: 编译成功
- Step 5: 提交 Anthropic 流式实现
git add crates/zclaw-runtime/src/driver/anthropic.rs crates/zclaw-runtime/Cargo.toml
git commit -m "feat(runtime): implement streaming for Anthropic driver
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
Task 2.2: 实现 OpenAI 流式 API
Files:
-
Modify:
crates/zclaw-runtime/src/driver/openai.rs -
Step 1: 读取现有 OpenAI 驱动代码
Run: cat crates/zclaw-runtime/src/driver/openai.rs | head -100
Expected: 查看现有实现结构
- Step 2: 添加 OpenAI 流式实现
在 openai.rs 中添加类似的 stream 方法,适配 OpenAI 的 SSE 格式:
fn stream(
&self,
request: CompletionRequest,
) -> Pin<Box<dyn Stream<Item = Result<StreamChunk>> + Send + '_>> {
Box::pin(async_stream::stream! {
let stream_request = self.build_api_request(&request);
let response = match self.client
.post(format!("{}/v1/chat/completions", self.base_url))
.header("Authorization", format!("Bearer {}", self.api_key.expose_secret()))
.header("Content-Type", "application/json")
.json(&stream_request)
.send()
.await
{
Ok(r) => r,
Err(e) => {
yield Err(ZclawError::LlmError(format!("HTTP request failed: {}", e)));
return;
}
};
let mut byte_stream = response.bytes_stream();
while let Some(chunk_result) = byte_stream.next().await {
let chunk = match chunk_result {
Ok(c) => c,
Err(e) => {
yield Err(ZclawError::LlmError(format!("Stream error: {}", e)));
continue;
}
};
let text = String::from_utf8_lossy(&chunk);
for line in text.lines() {
if let Some(data) = line.strip_prefix("data: ") {
if data == "[DONE]" {
yield Ok(StreamChunk::Complete {
input_tokens: 0,
output_tokens: 0,
stop_reason: "end_turn".to_string(),
});
continue;
}
match serde_json::from_str::<OpenAiStreamResponse>(data) {
Ok(resp) => {
if let Some(choice) = resp.choices.first() {
if let Some(delta) = &choice.delta {
if let Some(content) = &delta.content {
yield Ok(StreamChunk::TextDelta { delta: content.clone() });
}
}
if let Some(tool_calls) = &choice.delta.tool_calls {
for tc in tool_calls {
if let Some(function) = &tc.function {
yield Ok(StreamChunk::ToolUseDelta {
id: tc.id.clone().unwrap_or_default(),
delta: function.arguments.clone().unwrap_or_default(),
});
}
}
}
}
}
Err(e) => {
tracing::warn!("Failed to parse OpenAI SSE: {}", e);
}
}
}
}
}
})
}
- Step 3: 添加 OpenAI 流式类型
#[derive(Debug, Deserialize)]
struct OpenAiStreamResponse {
choices: Vec<OpenAiStreamChoice>,
}
#[derive(Debug, Deserialize)]
struct OpenAiStreamChoice {
delta: OpenAiDelta,
finish_reason: Option<String>,
}
#[derive(Debug, Deserialize)]
struct OpenAiDelta {
content: Option<String>,
tool_calls: Option<Vec<OpenAiToolCallDelta>>,
}
#[derive(Debug, Deserialize)]
struct OpenAiToolCallDelta {
id: Option<String>,
function: Option<OpenAiFunctionDelta>,
}
#[derive(Debug, Deserialize)]
struct OpenAiFunctionDelta {
arguments: Option<String>,
}
- Step 4: 验证编译通过
Run: cd crates/zclaw-runtime && cargo check
Expected: 编译成功
- Step 5: 提交 OpenAI 流式实现
git add crates/zclaw-runtime/src/driver/openai.rs
git commit -m "feat(runtime): implement streaming for OpenAI driver
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
Chunk 3: 流式响应 - LoopRunner 和 Tauri 集成
Task 3.1: 实现 LoopRunner 流式循环
Files:
-
Modify:
crates/zclaw-runtime/src/loop_runner.rs -
Step 1: 修改 run_streaming 方法
替换现有的 TODO 占位符:
use crate::stream::{StreamChunk, StreamEvent};
use tauri::{AppHandle, Emitter};
/// Run the agent loop with streaming
pub async fn run_streaming(
&self,
session_id: SessionId,
input: String,
app_handle: Option<AppHandle>,
) -> Result<mpsc::Receiver<LoopEvent>> {
let (tx, rx) = mpsc::channel(100);
// Add user message to session
let user_message = Message::user(input);
self.memory.append_message(&session_id, &user_message).await?;
// Get all messages for context
let messages = self.memory.get_messages(&session_id).await?;
// Build completion request
let request = CompletionRequest {
model: self.model.clone(),
system: self.system_prompt.clone(),
messages,
tools: self.tools.definitions(),
max_tokens: Some(self.max_tokens),
temperature: Some(self.temperature),
stop: Vec::new(),
stream: true,
};
// Clone necessary data for the async task
let session_id_str = session_id.to_string();
let agent_id_str = self.agent_id.to_string();
let memory = self.memory.clone();
let driver = self.driver.clone();
tokio::spawn(async move {
let mut full_response = String::new();
let mut input_tokens = 0u32;
let mut output_tokens = 0u32;
let mut stream = driver.stream(request);
use futures::StreamExt;
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(chunk) => {
// Emit to Tauri if available
if let Some(ref handle) = app_handle {
let event = StreamEvent::new(&session_id_str, &agent_id_str, chunk.clone());
let _ = handle.emit("stream:chunk", event);
}
// Track response and tokens
match &chunk {
StreamChunk::TextDelta { delta } => {
full_response.push_str(delta);
let _ = tx.send(LoopEvent::Delta(delta.clone())).await;
}
StreamChunk::ThinkingDelta { delta } => {
let _ = tx.send(LoopEvent::Delta(format!("[思考] {}", delta))).await;
}
StreamChunk::ToolUseStart { name, .. } => {
let _ = tx.send(LoopEvent::ToolStart {
name: name.clone(),
input: serde_json::Value::Null,
}).await;
}
StreamChunk::ToolUseEnd { input, .. } => {
let _ = tx.send(LoopEvent::ToolEnd {
name: String::new(),
output: input.clone(),
}).await;
}
StreamChunk::Complete { input_tokens: it, output_tokens: ot, .. } => {
input_tokens = *it;
output_tokens = *ot;
}
StreamChunk::Error { message } => {
let _ = tx.send(LoopEvent::Error(message.clone())).await;
}
_ => {}
}
}
Err(e) => {
let _ = tx.send(LoopEvent::Error(e.to_string())).await;
}
}
}
// Save assistant message to memory
let assistant_message = Message::assistant(full_response.clone());
let _ = memory.append_message(&session_id, &assistant_message).await;
// Send completion event
let _ = tx.send(LoopEvent::Complete(AgentLoopResult {
response: full_response,
input_tokens,
output_tokens,
iterations: 1,
})).await;
});
Ok(rx)
}
- Step 2: 添加 tauri 依赖 (可选)
如果需要在 runtime crate 中使用 tauri,在 Cargo.toml 添加:
[dependencies]
tauri = { version = "2", optional = true }
[features]
default = []
tauri = ["dep:tauri"]
- Step 3: 验证编译通过
Run: cd crates/zclaw-runtime && cargo check
Expected: 编译成功
- Step 4: 提交 LoopRunner 流式实现
git add crates/zclaw-runtime/src/loop_runner.rs crates/zclaw-runtime/Cargo.toml
git commit -m "feat(runtime): implement streaming in AgentLoop
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
Task 3.2: 添加 Tauri 流式命令
Files:
-
Modify:
desktop/src-tauri/src/lib.rs -
Step 1: 添加流式聊天命令
在 Tauri commands 中添加:
use zclaw_runtime::{AgentLoop, LoopEvent};
use tauri::{AppHandle, Emitter};
#[tauri::command]
async fn chat_stream(
agent_id: String,
session_id: String,
message: String,
app: AppHandle,
state: State<'_, KernelState>,
) -> Result<(), String> {
let agent_loop = state.kernel
.get_agent_loop(&agent_id)
.await
.map_err(|e| e.to_string())?;
let session: zclaw_types::SessionId = session_id.parse()
.map_err(|_| "Invalid session ID")?;
let mut rx = agent_loop
.run_streaming(session, message, Some(app))
.await
.map_err(|e| e.to_string())?;
// The streaming is handled via Tauri events in run_streaming
// This command just initiates the stream
Ok(())
}
- Step 2: 注册命令
在 lib.rs 的 invoke_handler 中注册:
.invoke_handler(tauri::generate_handler![
// ... existing commands
chat_stream,
])
- Step 3: 验证编译通过
Run: cd desktop && pnpm tauri build --debug 2>&1 | head -50
Expected: 编译开始,检查错误
- Step 4: 提交 Tauri 流式命令
git add desktop/src-tauri/src/lib.rs
git commit -m "feat(tauri): add chat_stream command for streaming
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
Chunk 4: 流式响应 - 前端集成
Task 4.1: 创建流式客户端
Files:
-
Create:
desktop/src/lib/streaming-client.ts -
Step 1: 创建流式客户端类
// desktop/src/lib/streaming-client.ts
import { listen, UnlistenFn } from '@tauri-apps/api/event';
export interface StreamChunk {
type: 'text_delta' | 'thinking_delta' | 'tool_use_start' | 'tool_use_end' | 'complete' | 'error';
delta?: string;
id?: string;
name?: string;
input?: unknown;
input_tokens?: number;
output_tokens?: number;
stop_reason?: string;
message?: string;
}
export interface StreamEvent {
session_id: string;
agent_id: string;
chunk: StreamChunk;
}
export type StreamHandler = (chunk: StreamChunk) => void;
export class StreamingClient {
private unlisten: UnlistenFn | null = null;
private currentSessionId: string | null = null;
private handlers: Map<string, StreamHandler[]> = new Map();
async connect(): Promise<void> {
this.unlisten = await listen<StreamEvent>('stream:chunk', (event) => {
const { session_id, chunk } = event.payload;
if (this.currentSessionId && session_id === this.currentSessionId) {
const handlers = this.handlers.get(session_id) || [];
handlers.forEach(handler => handler(chunk));
}
});
}
async disconnect(): Promise<void> {
if (this.unlisten) {
this.unlisten();
this.unlisten = null;
}
this.handlers.clear();
}
subscribe(sessionId: string, handler: StreamHandler): () => void {
this.currentSessionId = sessionId;
if (!this.handlers.has(sessionId)) {
this.handlers.set(sessionId, []);
}
this.handlers.get(sessionId)!.push(handler);
// Return unsubscribe function
return () => {
const handlers = this.handlers.get(sessionId);
if (handlers) {
const index = handlers.indexOf(handler);
if (index > -1) {
handlers.splice(index, 1);
}
}
};
}
clearSession(sessionId: string): void {
this.handlers.delete(sessionId);
if (this.currentSessionId === sessionId) {
this.currentSessionId = null;
}
}
}
// Singleton instance
let streamingClient: StreamingClient | null = null;
export async function getStreamingClient(): Promise<StreamingClient> {
if (!streamingClient) {
streamingClient = new StreamingClient();
await streamingClient.connect();
}
return streamingClient;
}
- Step 2: 验证 TypeScript 编译
Run: cd desktop && pnpm tsc --noEmit
Expected: 无类型错误
- Step 3: 提交流式客户端
git add desktop/src/lib/streaming-client.ts
git commit -m "feat(frontend): add streaming client for Tauri events
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
Task 4.2: 集成到 ChatStore
Files:
-
Modify:
desktop/src/store/chatStore.ts -
Step 1: 添加流式状态和处理
import { getStreamingClient, StreamChunk } from '../lib/streaming-client';
import { invoke } from '@tauri-apps/api/core';
interface ChatState {
// ... existing state
streamingMessage: string;
isStreaming: boolean;
streamError: string | null;
}
// In store actions:
async sendStreamingMessage(content: string) {
const sessionId = this.currentSessionId;
const agentId = this.currentAgentId;
if (!sessionId || !agentId) return;
// Add user message immediately
this.addMessage({
role: 'user',
content,
});
// Prepare for streaming
this.streamingMessage = '';
this.isStreaming = true;
this.streamError = null;
// Create placeholder assistant message
const assistantMessageId = this.addMessage({
role: 'assistant',
content: '',
});
try {
const client = await getStreamingClient();
const unsubscribe = client.subscribe(sessionId, (chunk: StreamChunk) => {
switch (chunk.type) {
case 'text_delta':
this.streamingMessage += chunk.delta || '';
this.updateMessage(assistantMessageId, {
content: this.streamingMessage,
});
break;
case 'thinking_delta':
// Handle thinking if needed
break;
case 'tool_use_start':
this.addMessage({
role: 'tool_use',
content: '',
toolName: chunk.name,
toolId: chunk.id,
});
break;
case 'complete':
this.isStreaming = false;
unsubscribe();
break;
case 'error':
this.streamError = chunk.message || 'Unknown error';
this.isStreaming = false;
unsubscribe();
break;
}
});
// Invoke the streaming command
await invoke('chat_stream', {
agentId,
sessionId,
message: content,
});
} catch (error) {
this.streamError = String(error);
this.isStreaming = false;
}
}
- Step 2: 验证 TypeScript 编译
Run: cd desktop && pnpm tsc --noEmit
Expected: 无类型错误
- Step 3: 提交 ChatStore 流式集成
git add desktop/src/store/chatStore.ts
git commit -m "feat(frontend): integrate streaming into ChatStore
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
Task 4.3: 更新聊天 UI 组件
Files:
-
Modify:
desktop/src/components/ChatPanel.tsx(或对应组件) -
Step 1: 添加流式发送逻辑
在发送消息的处理中添加流式判断:
const handleSend = async (content: string) => {
if (useStreaming) {
await chatStore.sendStreamingMessage(content);
} else {
await chatStore.sendMessage(content);
}
};
- Step 2: 添加流式指示器
{chatStore.isStreaming && (
<div className="flex items-center gap-2 text-sm text-gray-500">
<span className="animate-pulse">●</span>
正在生成...
</div>
)}
- Step 3: 验证编译通过
Run: cd desktop && pnpm tsc --noEmit
Expected: 无类型错误
- Step 4: 提交 UI 更新
git add desktop/src/components/ChatPanel.tsx
git commit -m "feat(frontend): add streaming UI indicators
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
里程碑 M1: 流式响应完成
- 验证流式响应功能
Run: pnpm desktop 启动应用
Expected:
- 发送消息后能看到逐字显示
- 流式过程中有指示器
- 完成后状态正确
- 提交里程碑
git tag v0.2.0-m1-streaming
git push origin v0.2.0-m1-streaming
Chunk 5: MCP 协议 - 传输层和类型定义
Task 5.1: 创建 MCP JSON-RPC 类型
Files:
-
Create:
crates/zclaw-protocols/src/mcp_types.rs -
Step 1: 创建 MCP JSON-RPC 类型文件
//! MCP JSON-RPC 2.0 types
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
/// JSON-RPC 2.0 Request
#[derive(Debug, Clone, Serialize)]
pub struct JsonRpcRequest {
pub jsonrpc: &'static str,
pub id: u64,
pub method: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub params: Option<serde_json::Value>,
}
impl JsonRpcRequest {
pub fn new(id: u64, method: impl Into<String>) -> Self {
Self {
jsonrpc: "2.0",
id,
method: method.into(),
params: None,
}
}
pub fn with_params(mut self, params: serde_json::Value) -> Self {
self.params = Some(params);
self
}
}
/// JSON-RPC 2.0 Response
#[derive(Debug, Clone, Deserialize)]
pub struct JsonRpcResponse {
pub jsonrpc: String,
pub id: u64,
#[serde(default)]
pub result: Option<serde_json::Value>,
#[serde(default)]
pub error: Option<JsonRpcError>,
}
/// JSON-RPC Error
#[derive(Debug, Clone, Deserialize)]
pub struct JsonRpcError {
pub code: i32,
pub message: String,
#[serde(default)]
pub data: Option<serde_json::Value>,
}
/// MCP Initialize Request
#[derive(Debug, Clone, Serialize)]
pub struct InitializeRequest {
pub protocol_version: String,
pub capabilities: ClientCapabilities,
pub client_info: Implementation,
}
impl Default for InitializeRequest {
fn default() -> Self {
Self {
protocol_version: "2024-11-05".to_string(),
capabilities: ClientCapabilities::default(),
client_info: Implementation {
name: "zclaw".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
},
}
}
}
/// Client capabilities
#[derive(Debug, Clone, Serialize, Default)]
pub struct ClientCapabilities {
#[serde(skip_serializing_if = "Option::is_none")]
pub roots: Option<RootsCapability>,
#[serde(skip_serializing_if = "Option::is_none")]
pub sampling: Option<SamplingCapability>,
}
#[derive(Debug, Clone, Serialize)]
pub struct RootsCapability {
#[serde(default)]
pub list_changed: bool,
}
#[derive(Debug, Clone, Serialize, Default)]
pub struct SamplingCapability {}
/// Server capabilities (from initialize response)
#[derive(Debug, Clone, Deserialize, Default)]
pub struct ServerCapabilities {
#[serde(default)]
pub tools: Option<ToolsCapability>,
#[serde(default)]
pub resources: Option<ResourcesCapability>,
#[serde(default)]
pub prompts: Option<PromptsCapability>,
#[serde(default)]
pub logging: Option<LoggingCapability>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ToolsCapability {
#[serde(default)]
pub list_changed: bool,
}
#[derive(Debug, Clone, Deserialize)]
pub struct ResourcesCapability {
#[serde(default)]
pub subscribe: bool,
#[serde(default)]
pub list_changed: bool,
}
#[derive(Debug, Clone, Deserialize)]
pub struct PromptsCapability {
#[serde(default)]
pub list_changed: bool,
}
#[derive(Debug, Clone, Deserialize)]
pub struct LoggingCapability {}
/// Implementation info
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Implementation {
pub name: String,
pub version: String,
}
/// Initialize result
#[derive(Debug, Clone, Deserialize)]
pub struct InitializeResult {
pub protocol_version: String,
pub capabilities: ServerCapabilities,
pub server_info: Implementation,
}
/// Tool from tools/list
#[derive(Debug, Clone, Deserialize)]
pub struct Tool {
pub name: String,
#[serde(default)]
pub description: Option<String>,
pub input_schema: serde_json::Value,
}
/// Resource from resources/list
#[derive(Debug, Clone, Deserialize)]
pub struct Resource {
pub uri: String,
pub name: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub mime_type: Option<String>,
}
/// Prompt from prompts/list
#[derive(Debug, Clone, Deserialize)]
pub struct Prompt {
pub name: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub arguments: Vec<PromptArgument>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct PromptArgument {
pub name: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub required: bool,
}
/// Tool call result
#[derive(Debug, Clone, Deserialize)]
pub struct CallToolResult {
pub content: Vec<ContentBlock>,
#[serde(default)]
pub is_error: bool,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ContentBlock {
Text { text: String },
Image { data: String, mime_type: String },
Resource { resource: ResourceContent },
}
#[derive(Debug, Clone, Deserialize)]
pub struct ResourceContent {
pub uri: String,
#[serde(default)]
pub mime_type: Option<String>,
#[serde(default)]
pub text: Option<String>,
#[serde(default)]
pub blob: Option<String>,
}
/// Read resource result
#[derive(Debug, Clone, Deserialize)]
pub struct ReadResourceResult {
pub contents: Vec<ResourceContent>,
}
/// Get prompt result
#[derive(Debug, Clone, Deserialize)]
pub struct GetPromptResult {
#[serde(default)]
pub description: Option<String>,
pub messages: Vec<PromptMessage>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct PromptMessage {
pub role: String,
pub content: ContentBlock,
}
- Step 2: 验证编译通过
Run: cd crates/zclaw-protocols && cargo check
Expected: 编译成功
- Step 3: 提交 MCP 类型定义
git add crates/zclaw-protocols/src/mcp_types.rs
git commit -m "feat(protocols): add MCP JSON-RPC type definitions
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
Task 5.2: 创建 MCP 传输层
Files:
-
Create:
crates/zclaw-protocols/src/mcp_transport.rs -
Step 1: 创建传输层 trait 和实现
//! MCP Transport layer - stdio and HTTP
use async_trait::async_trait;
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
use tokio::process::{Child, ChildStdin, ChildStdout, Command};
use serde_json::Value;
use std::io;
use std::process::Stdio;
use zclaw_types::Result;
use zclaw_types::ZclawError;
use super::mcp_types::{JsonRpcRequest, JsonRpcResponse};
/// MCP Transport trait
#[async_trait]
pub trait McpTransport: Send + Sync {
/// Send a request and receive response
async fn request(&mut self, req: JsonRpcRequest) -> Result<JsonRpcResponse>;
/// Send a notification (no response expected)
async fn notify(&mut self, method: &str, params: Option<Value>) -> Result<()>;
/// Close the transport
async fn close(&mut self) -> Result<()>;
}
/// Stdio transport for local MCP servers
pub struct StdioTransport {
process: Child,
stdin: ChildStdin,
stdout: BufReader<ChildStdout>,
request_id: u64,
}
impl StdioTransport {
/// Start a new MCP server process and connect via stdio
pub async fn start(command: &str, args: &[&str]) -> Result<Self> {
let mut cmd = Command::new(command);
cmd.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::null());
let mut process = cmd.spawn()
.map_err(|e| ZclawError::McpError(format!("Failed to start MCP server: {}", e)))?;
let stdin = process.stdin.take()
.ok_or_else(|| ZclawError::McpError("Failed to open stdin".to_string()))?;
let stdout = process.stdout.take()
.ok_or_else(|| ZclawError::McpError("Failed to open stdout".to_string()))?;
Ok(Self {
process,
stdin,
stdout: BufReader::new(stdout),
request_id: 0,
})
}
fn next_request_id(&mut self) -> u64 {
self.request_id += 1;
self.request_id
}
}
#[async_trait]
impl McpTransport for StdioTransport {
async fn request(&mut self, req: JsonRpcRequest) -> Result<JsonRpcResponse> {
// Serialize and send request
let json = serde_json::to_string(&req)
.map_err(|e| ZclawError::McpError(format!("JSON serialize error: {}", e)))?;
let line = format!("Content-Length: {}\r\n\r\n{}", json.len(), json);
self.stdin.write_all(line.as_bytes()).await
.map_err(|e| ZclawError::McpError(format!("Write error: {}", e)))?;
self.stdin.flush().await
.map_err(|e| ZclawError::McpError(format!("Flush error: {}", e)))?;
// Read response
// Read Content-Length header
let mut header_line = String::new();
self.stdout.read_line(&mut header_line).await
.map_err(|e| ZclawError::McpError(format!("Read header error: {}", e)))?;
let content_length: usize = header_line
.strip_prefix("Content-Length: ")
.and_then(|s| s.trim().parse().ok())
.ok_or_else(|| ZclawError::McpError("Invalid Content-Length header".to_string()))?;
// Read empty line
let mut empty_line = String::new();
self.stdout.read_line(&mut empty_line).await
.map_err(|e| ZclawError::McpError(format!("Read separator error: {}", e)))?;
// Read body
let mut body = vec![0u8; content_length];
use tokio::io::AsyncReadExt;
self.stdout.read_exact(&mut body).await
.map_err(|e| ZclawError::McpError(format!("Read body error: {}", e)))?;
let body_str = String::from_utf8(body)
.map_err(|e| ZclawError::McpError(format!("UTF-8 decode error: {}", e)))?;
let response: JsonRpcResponse = serde_json::from_str(&body_str)
.map_err(|e| ZclawError::McpError(format!("JSON parse error: {}", e)))?;
Ok(response)
}
async fn notify(&mut self, method: &str, params: Option<Value>) -> Result<()> {
let id = self.next_request_id();
let req = JsonRpcRequest::new(id, method).with_params(params.unwrap_or(Value::Null));
let json = serde_json::to_string(&req)
.map_err(|e| ZclawError::McpError(format!("JSON serialize error: {}", e)))?;
let line = format!("Content-Length: {}\r\n\r\n{}", json.len(), json);
self.stdin.write_all(line.as_bytes()).await
.map_err(|e| ZclawError::McpError(format!("Write error: {}", e)))?;
self.stdin.flush().await
.map_err(|e| ZclawError::McpError(format!("Flush error: {}", e)))?;
Ok(())
}
async fn close(&mut self) -> Result<()> {
let _ = self.process.kill().await;
Ok(())
}
}
/// HTTP transport for remote MCP servers
pub struct HttpTransport {
base_url: String,
client: reqwest::Client,
request_id: u64,
}
impl HttpTransport {
pub fn new(base_url: impl Into<String>) -> Self {
Self {
base_url: base_url.into(),
client: reqwest::Client::new(),
request_id: 0,
}
}
fn next_request_id(&mut self) -> u64 {
self.request_id += 1;
self.request_id
}
}
#[async_trait]
impl McpTransport for HttpTransport {
async fn request(&mut self, req: JsonRpcRequest) -> Result<JsonRpcResponse> {
let response = self.client
.post(&self.base_url)
.json(&req)
.send()
.await
.map_err(|e| ZclawError::McpError(format!("HTTP request error: {}", e)))?;
if !response.status().is_success() {
return Err(ZclawError::McpError(format!("HTTP error: {}", response.status())));
}
let jsonrpc_response: JsonRpcResponse = response.json().await
.map_err(|e| ZclawError::McpError(format!("JSON parse error: {}", e)))?;
Ok(jsonrpc_response)
}
async fn notify(&mut self, _method: &str, _params: Option<Value>) -> Result<()> {
// HTTP doesn't support notifications in the same way
// Could implement as fire-and-forget POST
Ok(())
}
async fn close(&mut self) -> Result<()> {
Ok(())
}
}
- Step 2: 在 ZclawError 中添加 McpError 变体
检查 crates/zclaw-types/src/error.rs,确保有:
pub enum ZclawError {
// ... existing variants
McpError(String),
}
- Step 3: 验证编译通过
Run: cd crates/zclaw-protocols && cargo check
Expected: 编译成功
- Step 4: 提交传输层实现
git add crates/zclaw-protocols/src/mcp_transport.rs crates/zclaw-types/src/error.rs
git commit -m "feat(protocols): add MCP transport layer (stdio/HTTP)
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
Chunk 6: MCP 协议 - 客户端实现
Task 6.1: 实现完整 MCP 客户端
Files:
-
Create:
crates/zclaw-protocols/src/mcp_client.rs -
Modify:
crates/zclaw-protocols/src/mcp.rs -
Modify:
crates/zclaw-protocols/src/lib.rs -
Step 1: 创建 MCP 客户端
//! MCP Client implementation
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use zclaw_types::{Result, ZclawError};
use super::mcp_transport::McpTransport;
use super::mcp_types::*;
use super::{McpClient, McpTool, McpResource, McpPrompt, McpToolCallRequest, McpToolCallResponse, McpResourceContent};
/// MCP Client configuration
#[derive(Debug, Clone)]
pub struct McpClientConfig {
pub name: String,
pub transport_type: McpTransportType,
}
#[derive(Debug, Clone)]
pub enum McpTransportType {
Stdio { command: String, args: Vec<String> },
Http { url: String },
}
/// Full MCP client implementation
pub struct FullMcpClient {
config: McpClientConfig,
transport: Arc<RwLock<Box<dyn McpTransport>>>,
server_capabilities: RwLock<Option<ServerCapabilities>>,
server_info: RwLock<Option<Implementation>>,
initialized: RwLock<bool>,
}
impl FullMcpClient {
/// Create and initialize a new MCP client
pub async fn connect(config: McpClientConfig) -> Result<Self> {
let transport: Box<dyn McpTransport> = match &config.transport_type {
McpTransportType::Stdio { command, args } => {
let args_ref: Vec<&str> = args.iter().map(|s| s.as_str()).collect();
Box::new(
super::mcp_transport::StdioTransport::start(command, &args_ref).await?
)
}
McpTransportType::Http { url } => {
Box::new(super::mcp_transport::HttpTransport::new(url))
}
};
let client = Self {
config,
transport: Arc::new(RwLock::new(transport)),
server_capabilities: RwLock::new(None),
server_info: RwLock::new(None),
initialized: RwLock::new(false),
};
// Initialize connection
client.initialize().await?;
Ok(client)
}
/// Send initialize request
async fn initialize(&self) -> Result<()> {
let init_request = InitializeRequest::default();
let request = JsonRpcRequest::new(1, "initialize")
.with_params(serde_json::to_value(&init_request)
.map_err(|e| ZclawError::McpError(format!("Serialize error: {}", e)))?);
let mut transport = self.transport.write().await;
let response = transport.request(request).await?;
if let Some(error) = response.error {
return Err(ZclawError::McpError(format!("Initialize error: {}", error.message)));
}
let result: InitializeResult = response.result
.ok_or_else(|| ZclawError::McpError("No result in initialize response".to_string()))?
.deserialize()
.map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;
// Store server info
*self.server_capabilities.write().await = Some(result.capabilities);
*self.server_info.write().await = Some(result.server_info);
// Send initialized notification
transport.notify("notifications/initialized", None).await?;
*self.initialized.write().await = true;
Ok(())
}
/// Check if server supports tools
pub async fn supports_tools(&self) -> bool {
self.server_capabilities.read().await
.as_ref()
.map(|c| c.tools.is_some())
.unwrap_or(false)
}
/// Check if server supports resources
pub async fn supports_resources(&self) -> bool {
self.server_capabilities.read().await
.as_ref()
.map(|c| c.resources.is_some())
.unwrap_or(false)
}
/// Check if server supports prompts
pub async fn supports_prompts(&self) -> bool {
self.server_capabilities.read().await
.as_ref()
.map(|c| c.prompts.is_some())
.unwrap_or(false)
}
}
#[async_trait]
impl McpClient for FullMcpClient {
async fn list_tools(&self) -> Result<Vec<McpTool>> {
if !self.supports_tools().await {
return Ok(Vec::new());
}
let request = JsonRpcRequest::new(2, "tools/list", None);
let response = self.transport.write().await.request(request).await?;
if let Some(error) = response.error {
return Err(ZclawError::McpError(format!("List tools error: {}", error.message)));
}
#[derive(Deserialize)]
struct ToolsListResult {
tools: Vec<Tool>,
}
let result: ToolsListResult = response.result
.ok_or_else(|| ZclawError::McpError("No result".to_string()))?
.deserialize()
.map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;
Ok(result.tools.into_iter().map(|t| McpTool {
name: t.name,
description: t.description.unwrap_or_default(),
input_schema: t.input_schema,
}).collect())
}
async fn call_tool(&self, request: McpToolCallRequest) -> Result<McpToolCallResponse> {
let params = serde_json::json!({
"name": request.name,
"arguments": request.arguments,
});
let rpc_request = JsonRpcRequest::new(3, "tools/call").with_params(params);
let response = self.transport.write().await.request(rpc_request).await?;
if let Some(error) = response.error {
return Err(ZclawError::McpError(format!("Call tool error: {}", error.message)));
}
let result: CallToolResult = response.result
.ok_or_else(|| ZclawError::McpError("No result".to_string()))?
.deserialize()
.map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;
Ok(McpToolCallResponse {
content: result.content.into_iter().map(|c| match c {
ContentBlock::Text { text } => super::McpContent::Text { text },
ContentBlock::Image { data, mime_type } => super::McpContent::Image { data, mime_type },
ContentBlock::Resource { resource } => super::McpContent::Resource {
resource: McpResourceContent {
uri: resource.uri,
mime_type: resource.mime_type,
text: resource.text,
blob: resource.blob,
},
},
}).collect(),
is_error: result.is_error,
})
}
async fn list_resources(&self) -> Result<Vec<McpResource>> {
if !self.supports_resources().await {
return Ok(Vec::new());
}
let request = JsonRpcRequest::new(4, "resources/list", None);
let response = self.transport.write().await.request(request).await?;
if let Some(error) = response.error {
return Err(ZclawError::McpError(format!("List resources error: {}", error.message)));
}
#[derive(Deserialize)]
struct ResourcesListResult {
resources: Vec<Resource>,
}
let result: ResourcesListResult = response.result
.ok_or_else(|| ZclawError::McpError("No result".to_string()))?
.deserialize()
.map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;
Ok(result.resources.into_iter().map(|r| McpResource {
uri: r.uri,
name: r.name,
description: r.description,
mime_type: r.mime_type,
}).collect())
}
async fn read_resource(&self, uri: &str) -> Result<McpResourceContent> {
let params = serde_json::json!({
"uri": uri,
});
let request = JsonRpcRequest::new(5, "resources/read").with_params(params);
let response = self.transport.write().await.request(request).await?;
if let Some(error) = response.error {
return Err(ZclawError::McpError(format!("Read resource error: {}", error.message)));
}
let result: ReadResourceResult = response.result
.ok_or_else(|| ZclawError::McpError("No result".to_string()))?
.deserialize()
.map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;
let content = result.contents.into_iter().next()
.ok_or_else(|| ZclawError::McpError("No content".to_string()))?;
Ok(McpResourceContent {
uri: content.uri,
mime_type: content.mime_type,
text: content.text,
blob: content.blob,
})
}
async fn list_prompts(&self) -> Result<Vec<McpPrompt>> {
if !self.supports_prompts().await {
return Ok(Vec::new());
}
let request = JsonRpcRequest::new(6, "prompts/list", None);
let response = self.transport.write().await.request(request).await?;
if let Some(error) = response.error {
return Err(ZclawError::McpError(format!("List prompts error: {}", error.message)));
}
#[derive(Deserialize)]
struct PromptsListResult {
prompts: Vec<Prompt>,
}
let result: PromptsListResult = response.result
.ok_or_else(|| ZclawError::McpError("No result".to_string()))?
.deserialize()
.map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;
Ok(result.prompts.into_iter().map(|p| McpPrompt {
name: p.name,
description: p.description.unwrap_or_default(),
arguments: p.arguments.into_iter().map(|a| super::McpPromptArgument {
name: a.name,
description: a.description.unwrap_or_default(),
required: a.required,
}).collect(),
}).collect())
}
async fn get_prompt(&self, name: &str, arguments: HashMap<String, String>) -> Result<String> {
let params = serde_json::json!({
"name": name,
"arguments": arguments,
});
let request = JsonRpcRequest::new(7, "prompts/get").with_params(params);
let response = self.transport.write().await.request(request).await?;
if let Some(error) = response.error {
return Err(ZclawError::McpError(format!("Get prompt error: {}", error.message)));
}
let result: GetPromptResult = response.result
.ok_or_else(|| ZclawError::McpError("No result".to_string()))?
.deserialize()
.map_err(|e| ZclawError::McpError(format!("Parse error: {}", e)))?;
// Convert messages to string
let prompt_text: String = result.messages.into_iter()
.filter_map(|m| match m.content {
ContentBlock::Text { text } => Some(format!("{}: {}", m.role, text)),
_ => None,
})
.collect::<Vec<_>>()
.join("\n");
Ok(prompt_text)
}
}
- Step 2: 更新 lib.rs 导出
// crates/zclaw-protocols/src/lib.rs
pub mod mcp;
pub mod mcp_types;
pub mod mcp_transport;
pub mod mcp_client;
pub mod a2a;
pub use mcp::*;
pub use mcp_client::{FullMcpClient, McpClientConfig, McpTransportType};
- Step 3: 验证编译通过
Run: cd crates/zclaw-protocols && cargo check
Expected: 编译成功
- Step 4: 提交 MCP 客户端实现
git add crates/zclaw-protocols/src/mcp_client.rs crates/zclaw-protocols/src/lib.rs
git commit -m "feat(protocols): implement full MCP client with tools/resources/prompts
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
里程碑 M2: MCP 协议完成
- 验证 MCP 功能
创建测试脚本连接 filesystem-mcp:
# 安装 filesystem-mcp
npm install -g @modelcontextprotocol/server-filesystem
# 运行测试
cargo test --package zclaw-protocols --test mcp_test
Expected: 成功连接并列出文件
- 提交里程碑
git tag v0.2.0-m2-mcp
git push origin v0.2.0-m2-mcp
Chunk 7: Browser Hand 实现
Task 7.1: 添加 playwright 依赖
Files:
-
Modify:
crates/zclaw-hands/Cargo.toml -
Step 1: 添加 playwright 依赖
[dependencies]
# ... existing dependencies
chromiumoxide = { version = "0.7", features = ["tokio-runtime"] }
tokio-stream = "0.1"
注意: 使用 chromiumoxide 作为 playwright-rust 的替代品,因为它的 API 更稳定。
- Step 2: 验证依赖下载
Run: cd crates/zclaw-hands && cargo fetch
Expected: 依赖下载成功
- Step 3: 提交依赖更新
git add crates/zclaw-hands/Cargo.toml crates/zclaw-hands/Cargo.lock
git commit -m "chore(hands): add chromiumoxide dependency for Browser Hand
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
Task 7.2: 实现 Browser Hand
Files:
-
Create:
crates/zclaw-hands/src/hands/browser.rs -
Modify:
crates/zclaw-hands/src/hands/mod.rs -
Step 1: 创建 Browser Hand 实现
//! Browser Hand - Browser automation capability
use async_trait::async_trait;
use chromiumoxide::{Browser, BrowserConfig};
use chromiumoxide::cdp::browser_protocol::page::{ScreenshotFormat, ScreenshotParameters};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use zclaw_types::{AgentId, Result, ZclawError};
use crate::hand::{Hand, HandConfig, HandContext, HandResult, HandStatus};
/// Browser Hand - provides browser automation capabilities
pub struct BrowserHand {
config: HandConfig,
browser: Arc<Mutex<Option<Browser>>>,
page_url: Arc<Mutex<Option<String>>>,
}
impl BrowserHand {
pub fn new() -> Self {
Self {
config: HandConfig {
id: "browser".to_string(),
name: "Browser".to_string(),
description: "浏览器自动化能力包 - 自动化网页操作和数据采集".to_string(),
needs_approval: true,
dependencies: vec!["chromium".to_string()],
input_schema: Some(json!({
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["navigate", "click", "input", "screenshot", "wait", "evaluate", "close"]
},
"url": { "type": "string", "description": "URL to navigate to" },
"selector": { "type": "string", "description": "CSS selector" },
"text": { "type": "string", "description": "Text to input" },
"timeout": { "type": "number", "description": "Timeout in milliseconds" },
"script": { "type": "string", "description": "JavaScript to evaluate" },
},
"required": ["action"]
})),
tags: vec!["browser".to_string(), "automation".to_string(), "web-scraping".to_string()],
enabled: true,
},
browser: Arc::new(Mutex::new(None)),
page_url: Arc::new(Mutex::new(None)),
}
}
async fn ensure_browser(&self) -> Result<Browser> {
let mut browser_guard = self.browser.lock().await;
if browser_guard.is_none() {
let config = BrowserConfig::builder()
.window_size(1920, 1080)
.request_timeout(Duration::from_secs(30))
.build()
.map_err(|e| ZclawError::HandError(format!("Browser config error: {}", e)))?;
let browser = Browser::launch(config)
.await
.map_err(|e| ZclawError::HandError(format!("Browser launch error: {}", e)))?;
*browser_guard = Some(browser.clone());
}
Ok(browser_guard.clone().unwrap())
}
async fn navigate(&self, url: &str) -> Result<Value> {
let browser = self.ensure_browser().await?;
let page = browser.new_page("about:blank").await
.map_err(|e| ZclawError::HandError(format!("Page create error: {}", e)))?;
page.goto(url).await
.map_err(|e| ZclawError::HandError(format!("Navigate error: {}", e)))?;
page.wait_for_navigation().await
.map_err(|e| ZclawError::HandError(format!("Wait navigation error: {}", e)))?;
let title = page.get_title().await
.map_err(|e| ZclawError::HandError(format!("Get title error: {}", e)))?;
*self.page_url.lock().await = Some(url.to_string());
Ok(json!({
"success": true,
"url": url,
"title": title.unwrap_or_default()
}))
}
async fn click(&self, selector: &str) -> Result<Value> {
let browser = self.ensure_browser().await?;
let pages = browser.pages().await
.map_err(|e| ZclawError::HandError(format!("Get pages error: {}", e)))?;
let page = pages.first()
.ok_or_else(|| ZclawError::HandError("No active page".to_string()))?;
page.find_element(selector).await
.map_err(|e| ZclawError::HandError(format!("Find element error: {}", e)))?
.click().await
.map_err(|e| ZclawError::HandError(format!("Click error: {}", e)))?;
Ok(json!({
"success": true,
"selector": selector
}))
}
async fn input_text(&self, selector: &str, text: &str) -> Result<Value> {
let browser = self.ensure_browser().await?;
let pages = browser.pages().await
.map_err(|e| ZclawError::HandError(format!("Get pages error: {}", e)))?;
let page = pages.first()
.ok_or_else(|| ZclawError::HandError("No active page".to_string()))?;
page.find_element(selector).await
.map_err(|e| ZclawError::HandError(format!("Find element error: {}", e)))?
.type_str(text).await
.map_err(|e| ZclawError::HandError(format!("Input error: {}", e)))?;
Ok(json!({
"success": true,
"selector": selector,
"input_length": text.len()
}))
}
async fn screenshot(&self) -> Result<Value> {
let browser = self.ensure_browser().await?;
let pages = browser.pages().await
.map_err(|e| ZclawError::HandError(format!("Get pages error: {}", e)))?;
let page = pages.first()
.ok_or_else(|| ZclawError::HandError("No active page".to_string()))?;
let params = ScreenshotParameters::builder()
.format(ScreenshotFormat::Png)
.build();
let screenshot = page.screenshot(params).await
.map_err(|e| ZclawError::HandError(format!("Screenshot error: {}", e)))?;
let base64 = base64::encode(&screenshot);
Ok(json!({
"success": true,
"format": "png",
"data": base64,
"size_bytes": screenshot.len()
}))
}
async fn wait_for(&self, selector: &str, timeout_ms: u64) -> Result<Value> {
let browser = self.ensure_browser().await?;
let pages = browser.pages().await
.map_err(|e| ZclawError::HandError(format!("Get pages error: {}", e)))?;
let page = pages.first()
.ok_or_else(|| ZclawError::HandError("No active page".to_string()))?;
tokio::time::timeout(
Duration::from_millis(timeout_ms),
page.wait_for_element(selector)
).await
.map_err(|_| ZclawError::HandError(format!("Wait timeout for: {}", selector)))?
.map_err(|e| ZclawError::HandError(format!("Wait error: {}", e)))?;
Ok(json!({
"success": true,
"selector": selector
}))
}
async fn evaluate(&self, script: &str) -> Result<Value> {
let browser = self.ensure_browser().await?;
let pages = browser.pages().await
.map_err(|e| ZclawError::HandError(format!("Get pages error: {}", e)))?;
let page = pages.first()
.ok_or_else(|| ZclawError::HandError("No active page".to_string()))?;
let result = page.evaluate(script).await
.map_err(|e| ZclawError::HandError(format!("Evaluate error: {}", e)))?;
Ok(json!({
"success": true,
"result": result
}))
}
async fn close(&self) -> Result<Value> {
let mut browser_guard = self.browser.lock().await;
if let Some(browser) = browser_guard.take() {
browser.close().await
.map_err(|e| ZclawError::HandError(format!("Close error: {}", e)))?;
}
*self.page_url.lock().await = None;
Ok(json!({ "success": true }))
}
}
impl Default for BrowserHand {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl Hand for BrowserHand {
fn config(&self) -> &HandConfig {
&self.config
}
async fn execute(&self, _context: &HandContext, input: Value) -> Result<HandResult> {
#[derive(Deserialize)]
struct BrowserInput {
action: String,
url: Option<String>,
selector: Option<String>,
text: Option<String>,
timeout: Option<u64>,
script: Option<String>,
}
let input: BrowserInput = serde_json::from_value(input)
.map_err(|e| ZclawError::HandError(format!("Invalid input: {}", e)))?;
let result = match input.action.as_str() {
"navigate" => {
let url = input.url.ok_or_else(||
ZclawError::HandError("URL required for navigate".to_string()))?;
self.navigate(&url).await?
}
"click" => {
let selector = input.selector.ok_or_else(||
ZclawError::HandError("Selector required for click".to_string()))?;
self.click(&selector).await?
}
"input" => {
let selector = input.selector.ok_or_else(||
ZclawError::HandError("Selector required for input".to_string()))?;
let text = input.text.ok_or_else(||
ZclawError::HandError("Text required for input".to_string()))?;
self.input_text(&selector, &text).await?
}
"screenshot" => self.screenshot().await?,
"wait" => {
let selector = input.selector.ok_or_else(||
ZclawError::HandError("Selector required for wait".to_string()))?;
let timeout = input.timeout.unwrap_or(5000);
self.wait_for(&selector, timeout).await?
}
"evaluate" => {
let script = input.script.ok_or_else(||
ZclawError::HandError("Script required for evaluate".to_string()))?;
self.evaluate(&script).await?
}
"close" => self.close().await?,
_ => {
return Ok(HandResult::error(format!("Unknown action: {}", input.action)));
}
};
Ok(HandResult::success(result))
}
fn is_dependency_available(&self, dep: &str) -> bool {
match dep {
"chromium" => {
// Check if chromium/chrome is available
std::process::Command::new("chromium")
.arg("--version")
.output()
.is_ok() ||
std::process::Command::new("chrome")
.arg("--version")
.output()
.is_ok()
}
_ => true
}
}
}
- Step 2: 添加 base64 依赖
在 crates/zclaw-hands/Cargo.toml 添加:
base64 = "0.22"
- Step 3: 更新 mod.rs 导出
// crates/zclaw-hands/src/hands/mod.rs
//! Hands - Autonomous capabilities
mod whiteboard;
mod slideshow;
mod speech;
mod quiz;
mod browser;
pub use whiteboard::*;
pub use slideshow::*;
pub use speech::*;
pub use quiz::*;
pub use browser::*;
- Step 4: 在 ZclawError 中添加 HandError 变体
确保 crates/zclaw-types/src/error.rs 有:
HandError(String),
- Step 5: 验证编译通过
Run: cd crates/zclaw-hands && cargo check
Expected: 编译成功
- Step 6: 提交 Browser Hand 实现
git add crates/zclaw-hands/src/hands/browser.rs crates/zclaw-hands/src/hands/mod.rs crates/zclaw-hands/Cargo.toml crates/zclaw-types/src/error.rs
git commit -m "feat(hands): implement Browser Hand with chromiumoxide
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
里程碑 M3: Browser Hand 完成
- 验证 Browser Hand 功能
Run: cd crates/zclaw-hands && cargo test browser_hand -- --nocapture
Expected: 测试通过
- 提交里程碑
git tag v0.2.0-m3-browser
git push origin v0.2.0-m3-browser
Chunk 8: 工具安全 + 发布准备
Task 8.1: 创建安全配置文件
Files:
-
Create:
config/security.toml -
Step 1: 创建安全配置
# ZCLAW Security Configuration
# Controls which commands and operations are allowed
[shell_exec]
# Enable shell command execution
enabled = true
# Default timeout in seconds
default_timeout = 60
# Maximum output size in bytes
max_output_size = 1048576 # 1MB
# Whitelist of allowed commands
# If whitelist is non-empty, only these commands are allowed
allowed_commands = [
"git",
"npm",
"pnpm",
"node",
"cargo",
"rustc",
"python",
"python3",
"pip",
"ls",
"cat",
"echo",
"mkdir",
"rm",
"cp",
"mv",
"grep",
"find",
"head",
"tail",
"wc",
]
# Blacklist of dangerous commands (always blocked)
blocked_commands = [
"rm -rf /",
"dd",
"mkfs",
"format",
"shutdown",
"reboot",
"init",
"systemctl",
]
[file_read]
enabled = true
# Allowed directory prefixes (empty = allow all)
allowed_paths = []
# Blocked paths (always blocked)
blocked_paths = [
"/etc/shadow",
"/etc/passwd",
"~/.ssh",
"~/.gnupg",
]
[file_write]
enabled = true
# Maximum file size in bytes (10MB)
max_file_size = 10485760
# Blocked paths
blocked_paths = [
"/etc",
"/usr",
"/bin",
"/sbin",
"C:\\Windows",
"C:\\Program Files",
]
[web_fetch]
enabled = true
# Request timeout in seconds
timeout = 30
# Maximum response size in bytes (10MB)
max_response_size = 10485760
# Block internal/private IP ranges (SSRF protection)
block_private_ips = true
# Allowed domains (empty = allow all)
allowed_domains = []
# Blocked domains
blocked_domains = []
- Step 2: 提交安全配置
git add config/security.toml
git commit -m "feat(config): add security configuration file
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
Task 8.2: 实现工具安全验证
Files:
-
Modify:
crates/zclaw-runtime/src/tool/builtin/shell_exec.rs -
Step 1: 实现命令白名单验证
//! Shell execution tool with security controls
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::process::Command;
use std::time::{Duration, Instant};
use tokio::time::timeout;
use zclaw_types::{Result, ZclawError};
use super::{BuiltinTool, ToolContext, ToolResult};
/// Security configuration for shell execution
#[derive(Debug, Clone, Deserialize)]
pub struct ShellSecurityConfig {
pub enabled: bool,
pub default_timeout: u64,
pub max_output_size: usize,
pub allowed_commands: Vec<String>,
pub blocked_commands: Vec<String>,
}
impl Default for ShellSecurityConfig {
fn default() -> Self {
Self {
enabled: true,
default_timeout: 60,
max_output_size: 1024 * 1024, // 1MB
allowed_commands: vec![
"git".to_string(), "npm".to_string(), "pnpm".to_string(),
"node".to_string(), "cargo".to_string(), "rustc".to_string(),
"python".to_string(), "python3".to_string(), "pip".to_string(),
"ls".to_string(), "cat".to_string(), "echo".to_string(),
],
blocked_commands: vec![
"rm -rf /".to_string(), "dd".to_string(), "mkfs".to_string(),
"shutdown".to_string(), "reboot".to_string(),
],
}
}
}
impl ShellSecurityConfig {
/// Load from config file
pub fn load() -> Self {
// Try to load from config/security.toml
if let Ok(content) = std::fs::read_to_string("config/security.toml") {
if let Ok(config) = toml::from_str::<SecurityConfigFile>(&content) {
return config.shell_exec;
}
}
Self::default()
}
/// Check if a command is allowed
pub fn is_command_allowed(&self, command: &str) -> Result<()> {
// Parse the base command
let base_cmd = command.split_whitespace().next().unwrap_or("");
// Check blocked commands first
for blocked in &self.blocked_commands {
if command.starts_with(blocked) || command.contains(blocked) {
return Err(ZclawError::SecurityError(
format!("Command blocked: {}", blocked)
));
}
}
// If whitelist is non-empty, check against it
if !self.allowed_commands.is_empty() {
let allowed: HashSet<&str> = self.allowed_commands
.iter()
.map(|s| s.as_str())
.collect();
if !allowed.contains(base_cmd) {
return Err(ZclawError::SecurityError(
format!("Command not in whitelist: {}", base_cmd)
));
}
}
Ok(())
}
}
#[derive(Debug, Deserialize)]
struct SecurityConfigFile {
shell_exec: ShellSecurityConfig,
}
/// Shell execution tool
pub struct ShellExecTool {
config: ShellSecurityConfig,
}
impl ShellExecTool {
pub fn new() -> Self {
Self {
config: ShellSecurityConfig::load(),
}
}
}
impl Default for ShellExecTool {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl BuiltinTool for ShellExecTool {
fn name(&self) -> &str {
"shell_exec"
}
fn description(&self) -> &str {
"Execute shell commands with security controls"
}
fn input_schema(&self) -> serde_json::Value {
serde_json::json!({
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The shell command to execute"
},
"timeout": {
"type": "number",
"description": "Timeout in seconds (default: 60)"
}
},
"required": ["command"]
})
}
async fn execute(&self, _context: &ToolContext, input: serde_json::Value) -> Result<ToolResult> {
#[derive(Deserialize)]
struct Input {
command: String,
#[serde(default = "default_timeout")]
timeout: u64,
}
fn default_timeout() -> u64 { 60 }
let input: Input = serde_json::from_value(input)
.map_err(|e| ZclawError::ToolError(format!("Invalid input: {}", e)))?;
// Security check
self.config.is_command_allowed(&input.command)?;
// Determine timeout
let timeout_secs = input.timeout.min(self.config.default_timeout * 2);
let timeout_duration = Duration::from_secs(timeout_secs);
// Execute command
let start = Instant::now();
let output = timeout(timeout_duration, async {
#[cfg(target_os = "windows")]
let output = Command::new("cmd")
.args(["/C", &input.command])
.output();
#[cfg(not(target_os = "windows"))]
let output = Command::new("sh")
.args(["-c", &input.command])
.output();
output
})
.await
.map_err(|_| ZclawError::ToolError(format!("Command timed out after {}s", timeout_secs)))?
.map_err(|e| ZclawError::ToolError(format!("Failed to execute command: {}", e)))?;
let duration_ms = start.elapsed().as_millis() as u64;
// Truncate output if too large
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
let stdout_truncated = if stdout.len() > self.config.max_output_size {
&stdout[..self.config.max_output_size]
} else {
&stdout
};
Ok(ToolResult {
success: output.status.success(),
output: serde_json::json!({
"stdout": stdout_truncated,
"stderr": stderr,
"exit_code": output.status.code(),
"duration_ms": duration_ms,
}),
error: if output.status.success() {
None
} else {
Some(format!("Exit code: {:?}", output.status.code()))
},
})
}
}
- Step 2: 添加 SecurityError 到 ZclawError
在 crates/zclaw-types/src/error.rs 添加:
SecurityError(String),
- Step 3: 验证编译通过
Run: cd crates/zclaw-runtime && cargo check
Expected: 编译成功
- Step 4: 提交工具安全实现
git add crates/zclaw-runtime/src/tool/builtin/shell_exec.rs crates/zclaw-types/src/error.rs
git commit -m "feat(runtime): implement shell command security whitelist
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>"
里程碑 M4: 工具安全完成
- 验证安全功能
Run: cd crates/zclaw-runtime && cargo test shell_exec -- --nocapture
Expected: 安全测试通过,恶意命令被拦截
- 提交里程碑
git tag v0.2.0-m4-security
git push origin v0.2.0-m4-security
发布准备清单
Week 7-8: 最终验收
-
功能验收
- 流式响应:消息逐字显示,延迟 < 500ms
- MCP 协议:连接 filesystem-mcp,读取文件成功
- Browser Hand:打开网页、点击、截图成功
- 工具安全:恶意命令被拦截
- 核心对话:发送消息 → 收到响应 → 无崩溃
-
文档验收
- 用户手册更新
- CHANGELOG 编写
- README 更新
-
打包验收
- Windows 安装包构建成功
- 自签名证书配置
- 安装测试通过
-
发布验收
- GitHub Release 草稿准备
- 反馈渠道建立 (GitHub Issues)
最终发布命令
# 构建发布版本
cd desktop && pnpm tauri build
# 创建标签
git tag -a v0.2.0 -m "ZCLAW v0.2.0 - Streaming, MCP, Browser Hand"
# 推送标签
git push origin v0.2.0
# 上传到 GitHub Releases
gh release create v0.2.0 \
./desktop/src-tauri/target/release/bundle/msi/*.msi \
./desktop/src-tauri/target/release/bundle/nsis/*.exe \
--title "ZCLAW v0.2.0" \
--notes-file CHANGELOG.md
风险应急方案
| 风险 | 应急措施 |
|---|---|
| 流式响应延期 | 使用 SSE 简化方案 |
| MCP 兼容问题 | 优先支持 filesystem-mcp |
| Browser 依赖问题 | 提示用户安装 Chromium |
| 测试覆盖不足 | 延长 Week 6,增加测试时间 |
| 发布后崩溃 | 24h 内发布 hotfix |