Files
zclaw_openfang/crates/zclaw-protocols/src/mcp_transport.rs
iven bf6d81f9c6
Some checks failed
CI / Rust Check (push) Has been cancelled
CI / Lint & TypeCheck (push) Has been cancelled
CI / Unit Tests (push) Has been cancelled
CI / Build Frontend (push) Has been cancelled
CI / Security Scan (push) Has been cancelled
CI / E2E Tests (push) Has been cancelled
refactor: 清理未使用代码并添加未来功能标记
style: 统一代码格式和注释风格

docs: 更新多个功能文档的完整度和状态

feat(runtime): 添加路径验证工具支持

fix(pipeline): 改进条件判断和变量解析逻辑

test(types): 为ID类型添加全面测试用例

chore: 更新依赖项和Cargo.lock文件

perf(mcp): 优化MCP协议传输和错误处理
2026-03-25 21:55:12 +08:00

409 lines
14 KiB
Rust

//! MCP Transport Layer
//!
//! Implements stdio-based transport for MCP server communication.
use std::collections::HashMap;
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::thread;
use async_trait::async_trait;
use serde::de::DeserializeOwned;
use tokio::sync::Mutex;
use tracing::{debug, warn};
use zclaw_types::{Result, ZclawError};
use crate::mcp_types::*;
use crate::{McpClient, McpContent, McpPrompt, McpPromptArgument, McpResource, McpResourceContent, McpTool, McpToolCallRequest, McpToolCallResponse};
/// Global request ID counter
static REQUEST_ID: AtomicU64 = AtomicU64::new(1);
/// Generate next request ID
fn next_request_id() -> u64 {
REQUEST_ID.fetch_add(1, Ordering::SeqCst)
}
/// MCP Server process configuration
#[derive(Debug, Clone)]
pub struct McpServerConfig {
/// Command to run (e.g., "npx", "node", "python")
pub command: String,
/// Arguments for the command
pub args: Vec<String>,
/// Environment variables
pub env: HashMap<String, String>,
/// Working directory
pub cwd: Option<String>,
}
impl McpServerConfig {
/// Create configuration for npx-based MCP server
pub fn npx(package: &str) -> Self {
Self {
command: "npx".to_string(),
args: vec!["-y".to_string(), package.to_string()],
env: HashMap::new(),
cwd: None,
}
}
/// Create configuration for node-based MCP server
pub fn node(script: &str) -> Self {
Self {
command: "node".to_string(),
args: vec![script.to_string()],
env: HashMap::new(),
cwd: None,
}
}
/// Create configuration for python-based MCP server
pub fn python(script: &str) -> Self {
Self {
command: "python".to_string(),
args: vec![script.to_string()],
env: HashMap::new(),
cwd: None,
}
}
/// Add environment variable
pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.env.insert(key.into(), value.into());
self
}
/// Set working directory
pub fn cwd(mut self, cwd: impl Into<String>) -> Self {
self.cwd = Some(cwd.into());
self
}
}
/// MCP Transport using stdio
pub struct McpTransport {
config: McpServerConfig,
child: Arc<Mutex<Option<Child>>>,
stdin: Arc<Mutex<Option<BufWriter<ChildStdin>>>>,
stdout: Arc<Mutex<Option<BufReader<ChildStdout>>>>,
capabilities: Arc<Mutex<Option<ServerCapabilities>>>,
}
impl McpTransport {
/// Create new MCP transport
pub fn new(config: McpServerConfig) -> Self {
Self {
config,
child: Arc::new(Mutex::new(None)),
stdin: Arc::new(Mutex::new(None)),
stdout: Arc::new(Mutex::new(None)),
capabilities: Arc::new(Mutex::new(None)),
}
}
/// Start the MCP server process
pub async fn start(&self) -> Result<()> {
let mut child_guard = self.child.lock().await;
if child_guard.is_some() {
return Ok(()); // Already started
}
// Build command
let mut cmd = Command::new(&self.config.command);
cmd.args(&self.config.args);
// Set environment
for (key, value) in &self.config.env {
cmd.env(key, value);
}
// Set working directory
if let Some(cwd) = &self.config.cwd {
cmd.current_dir(cwd);
}
// Configure stdio - pipe stderr for debugging
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
// Spawn process
let mut child = cmd.spawn()
.map_err(|e| ZclawError::McpError(format!("Failed to start MCP server: {}", e)))?;
// Take stdin and stdout
let stdin = child.stdin.take()
.ok_or_else(|| ZclawError::McpError("Failed to get stdin".to_string()))?;
let stdout = child.stdout.take()
.ok_or_else(|| ZclawError::McpError("Failed to get stdout".to_string()))?;
// Take stderr and spawn a background thread to log it
if let Some(stderr) = child.stderr.take() {
let server_name = self.config.command.clone();
thread::spawn(move || {
let reader = BufReader::new(stderr);
for line in reader.lines() {
match line {
Ok(text) => {
debug!(server = %server_name, stderr = %text, "MCP server stderr");
}
Err(e) => {
warn!(server = %server_name, error = %e, "Failed to read MCP server stderr");
break;
}
}
}
debug!(server = %server_name, "MCP server stderr stream ended");
});
}
// Store handles in separate mutexes
*self.stdin.lock().await = Some(BufWriter::new(stdin));
*self.stdout.lock().await = Some(BufReader::new(stdout));
*child_guard = Some(child);
Ok(())
}
/// Initialize MCP connection
pub async fn initialize(&self) -> Result<()> {
// Ensure server is started
self.start().await?;
let request = InitializeRequest::default();
let _: InitializeResult = self.send_request("initialize", Some(&request)).await?;
// Store capabilities
let mut capabilities = self.capabilities.lock().await;
*capabilities = Some(ServerCapabilities::default());
Ok(())
}
/// Send JSON-RPC request
async fn send_request<T: DeserializeOwned>(
&self,
method: &str,
params: Option<&impl serde::Serialize>,
) -> Result<T> {
// Build request
let id = next_request_id();
let request = JsonRpcRequest {
jsonrpc: "2.0",
id,
method: method.to_string(),
params: params.map(|p| serde_json::to_value(p).unwrap_or(serde_json::Value::Null)),
};
// Serialize request
let line = serde_json::to_string(&request)
.map_err(|e| ZclawError::McpError(format!("Failed to serialize request: {}", e)))?;
// Write to stdin
{
let mut stdin_guard = self.stdin.lock().await;
let stdin = stdin_guard.as_mut()
.ok_or_else(|| ZclawError::McpError("Transport not started".to_string()))?;
stdin.write_all(line.as_bytes())
.map_err(|e| ZclawError::McpError(format!("Failed to write request: {}", e)))?;
stdin.write_all(b"\n")
.map_err(|e| ZclawError::McpError(format!("Failed to write newline: {}", e)))?;
stdin.flush()
.map_err(|e| ZclawError::McpError(format!("Failed to flush request: {}", e)))?;
}
// Read from stdout
let response_line = {
let mut stdout_guard = self.stdout.lock().await;
let stdout = stdout_guard.as_mut()
.ok_or_else(|| ZclawError::McpError("Transport not started".to_string()))?;
let mut response_line = String::new();
stdout.read_line(&mut response_line)
.map_err(|e| ZclawError::McpError(format!("Failed to read response: {}", e)))?;
response_line
};
// Parse response
let response: JsonRpcResponse<T> = serde_json::from_str(&response_line)
.map_err(|e| ZclawError::McpError(format!("Failed to parse response: {}", e)))?;
if let Some(error) = response.error {
return Err(ZclawError::McpError(format!("MCP error {}: {}", error.code, error.message)));
}
response.result.ok_or_else(|| ZclawError::McpError("No result in response".to_string()))
}
/// List available tools
pub async fn list_tools(&self) -> Result<Vec<Tool>> {
let result: ListToolsResult = self.send_request("tools/list", None::<&()>).await?;
Ok(result.tools)
}
/// Call a tool
pub async fn call_tool(&self, name: &str, arguments: serde_json::Value) -> Result<CallToolResult> {
let params = serde_json::json!({
"name": name,
"arguments": arguments
});
let result: CallToolResult = self.send_request("tools/call", Some(&params)).await?;
Ok(result)
}
/// List available resources
pub async fn list_resources(&self) -> Result<Vec<Resource>> {
let result: ListResourcesResult = self.send_request("resources/list", None::<&()>).await?;
Ok(result.resources)
}
/// Read a resource
pub async fn read_resource(&self, uri: &str) -> Result<ReadResourceResult> {
let params = serde_json::json!({
"uri": uri
});
let result: ReadResourceResult = self.send_request("resources/read", Some(&params)).await?;
Ok(result)
}
/// List available prompts
pub async fn list_prompts(&self) -> Result<Vec<Prompt>> {
let result: ListPromptsResult = self.send_request("prompts/list", None::<&()>).await?;
Ok(result.prompts)
}
/// Get a prompt
pub async fn get_prompt(&self, name: &str, arguments: Option<serde_json::Value>) -> Result<GetPromptResult> {
let mut params = serde_json::json!({
"name": name
});
if let Some(args) = arguments {
params["arguments"] = args;
}
let result: GetPromptResult = self.send_request("prompts/get", Some(&params)).await?;
Ok(result)
}
}
/// Implement McpClient trait for McpTransport
#[async_trait]
impl McpClient for McpTransport {
async fn list_tools(&self) -> Result<Vec<McpTool>> {
let tools = McpTransport::list_tools(self).await?;
Ok(tools.into_iter().map(|t| McpTool {
name: t.name,
description: t.description.unwrap_or_default(),
input_schema: t.input_schema,
}).collect())
}
async fn call_tool(&self, request: McpToolCallRequest) -> Result<McpToolCallResponse> {
let args = serde_json::to_value(&request.arguments)
.map_err(|e| ZclawError::McpError(format!("Failed to serialize arguments: {}", e)))?;
let result = McpTransport::call_tool(self, &request.name, args).await?;
Ok(McpToolCallResponse {
content: result.content.into_iter().map(|c| match c {
ContentBlock::Text { text } => McpContent::Text { text },
ContentBlock::Image { data, mime_type } => McpContent::Image { data, mime_type },
ContentBlock::Resource { resource } => McpContent::Resource {
resource: McpResourceContent {
uri: resource.uri,
mime_type: resource.mime_type,
text: resource.text,
blob: resource.blob,
}
}
}).collect(),
is_error: result.is_error,
})
}
async fn list_resources(&self) -> Result<Vec<McpResource>> {
let resources = McpTransport::list_resources(self).await?;
Ok(resources.into_iter().map(|r| McpResource {
uri: r.uri,
name: r.name,
description: r.description,
mime_type: r.mime_type,
}).collect())
}
async fn read_resource(&self, uri: &str) -> Result<McpResourceContent> {
let result = McpTransport::read_resource(self, uri).await?;
// Get first content item
let content = result.contents.first()
.ok_or_else(|| ZclawError::McpError("No resource content".to_string()))?;
Ok(McpResourceContent {
uri: content.uri.clone(),
mime_type: content.mime_type.clone(),
text: content.text.clone(),
blob: content.blob.clone(),
})
}
async fn list_prompts(&self) -> Result<Vec<McpPrompt>> {
let prompts = McpTransport::list_prompts(self).await?;
Ok(prompts.into_iter().map(|p| McpPrompt {
name: p.name,
description: p.description.unwrap_or_default(),
arguments: p.arguments.into_iter().map(|a| McpPromptArgument {
name: a.name,
description: a.description.unwrap_or_default(),
required: a.required,
}).collect(),
}).collect())
}
async fn get_prompt(&self, name: &str, arguments: HashMap<String, String>) -> Result<String> {
let args = if arguments.is_empty() {
None
} else {
Some(serde_json::to_value(&arguments)
.map_err(|e| ZclawError::McpError(format!("Failed to serialize arguments: {}", e)))?)
};
let result = McpTransport::get_prompt(self, name, args).await?;
// Combine messages into a string
let prompt_text: Vec<String> = result.messages.into_iter()
.filter_map(|m| match m.content {
ContentBlock::Text { text } => Some(format!("{}: {}", m.role, text)),
_ => None,
})
.collect();
Ok(prompt_text.join("\n"))
}
}
impl Drop for McpTransport {
fn drop(&mut self) {
// Try to kill the child process synchronously
// We use a blocking approach here since Drop cannot be async
if let Ok(mut child_guard) = self.child.try_lock() {
if let Some(mut child) = child_guard.take() {
// Try to kill the process gracefully
match child.kill() {
Ok(_) => {
// Wait for the process to exit
let _ = child.wait();
}
Err(e) => {
eprintln!("[McpTransport] Failed to kill child process: {}", e);
}
}
}
}
}
}