//! MCP Transport Layer //! //! Implements stdio-based transport for MCP server communication. use std::collections::HashMap; use std::io::{BufRead, BufReader, BufWriter, Write}; use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio}; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use std::thread; use async_trait::async_trait; use serde::de::DeserializeOwned; use tokio::sync::Mutex; use tracing::{debug, warn}; use zclaw_types::{Result, ZclawError}; use crate::mcp_types::*; use crate::{McpClient, McpContent, McpPrompt, McpPromptArgument, McpResource, McpResourceContent, McpTool, McpToolCallRequest, McpToolCallResponse}; /// Global request ID counter static REQUEST_ID: AtomicU64 = AtomicU64::new(1); /// Generate next request ID fn next_request_id() -> u64 { REQUEST_ID.fetch_add(1, Ordering::SeqCst) } /// MCP Server process configuration #[derive(Debug, Clone)] pub struct McpServerConfig { /// Command to run (e.g., "npx", "node", "python") pub command: String, /// Arguments for the command pub args: Vec, /// Environment variables pub env: HashMap, /// Working directory pub cwd: Option, } impl McpServerConfig { /// Create configuration for npx-based MCP server pub fn npx(package: &str) -> Self { Self { command: "npx".to_string(), args: vec!["-y".to_string(), package.to_string()], env: HashMap::new(), cwd: None, } } /// Create configuration for node-based MCP server pub fn node(script: &str) -> Self { Self { command: "node".to_string(), args: vec![script.to_string()], env: HashMap::new(), cwd: None, } } /// Create configuration for python-based MCP server pub fn python(script: &str) -> Self { Self { command: "python".to_string(), args: vec![script.to_string()], env: HashMap::new(), cwd: None, } } /// Add environment variable pub fn env(mut self, key: impl Into, value: impl Into) -> Self { self.env.insert(key.into(), value.into()); self } /// Set working directory pub fn cwd(mut self, cwd: impl Into) -> Self { self.cwd = Some(cwd.into()); self } } /// Combined transport handles (stdin + stdout) behind a single Mutex. /// This ensures write-then-read is atomic, preventing concurrent requests /// from receiving each other's responses. struct TransportHandles { stdin: BufWriter, stdout: BufReader, } /// MCP Transport using stdio pub struct McpTransport { config: McpServerConfig, child: Arc>>, /// Single Mutex protecting both stdin and stdout for atomic write-then-read handles: Arc>>, capabilities: Arc>>, } impl McpTransport { /// Create new MCP transport pub fn new(config: McpServerConfig) -> Self { Self { config, child: Arc::new(Mutex::new(None)), handles: Arc::new(Mutex::new(None)), capabilities: Arc::new(Mutex::new(None)), } } /// Start the MCP server process pub async fn start(&self) -> Result<()> { let mut child_guard = self.child.lock().await; if child_guard.is_some() { return Ok(()); // Already started } // Build command let mut cmd = Command::new(&self.config.command); cmd.args(&self.config.args); // Set environment for (key, value) in &self.config.env { cmd.env(key, value); } // Set working directory if let Some(cwd) = &self.config.cwd { cmd.current_dir(cwd); } // Configure stdio - pipe stderr for debugging cmd.stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()); // Spawn process let mut child = cmd.spawn() .map_err(|e| ZclawError::McpError(format!("Failed to start MCP server: {}", e)))?; // Take stdin and stdout let stdin = child.stdin.take() .ok_or_else(|| ZclawError::McpError("Failed to get stdin".to_string()))?; let stdout = child.stdout.take() .ok_or_else(|| ZclawError::McpError("Failed to get stdout".to_string()))?; // Take stderr and spawn a background thread to log it if let Some(stderr) = child.stderr.take() { let server_name = self.config.command.clone(); thread::spawn(move || { let reader = BufReader::new(stderr); for line in reader.lines() { match line { Ok(text) => { debug!(server = %server_name, stderr = %text, "MCP server stderr"); } Err(e) => { warn!(server = %server_name, error = %e, "Failed to read MCP server stderr"); break; } } } debug!(server = %server_name, "MCP server stderr stream ended"); }); } // Store handles in single mutex for atomic write-then-read *self.handles.lock().await = Some(TransportHandles { stdin: BufWriter::new(stdin), stdout: BufReader::new(stdout), }); *child_guard = Some(child); Ok(()) } /// Initialize MCP connection pub async fn initialize(&self) -> Result<()> { // Ensure server is started self.start().await?; let request = InitializeRequest::default(); let init_result: InitializeResult = self.send_request("initialize", Some(&request)).await?; // Store actual server capabilities (not empty defaults) let mut capabilities = self.capabilities.lock().await; *capabilities = Some(init_result.capabilities); drop(capabilities); // Send initialized notification (required by MCP spec) self.send_notification(&InitializedNotification::new()).await?; debug!( server = %init_result.server_info.name, version = %init_result.server_info.version, protocol = %init_result.protocol_version, "MCP server initialized" ); Ok(()) } /// Send JSON-RPC notification (no response expected) async fn send_notification(&self, notification: &impl serde::Serialize) -> Result<()> { let line = serde_json::to_string(notification) .map_err(|e| ZclawError::McpError(format!("Failed to serialize notification: {}", e)))?; let mut handles_guard = self.handles.lock().await; let handles = handles_guard.as_mut() .ok_or_else(|| ZclawError::McpError("Transport not started".to_string()))?; handles.stdin.write_all(line.as_bytes()) .map_err(|e| ZclawError::McpError(format!("Failed to write notification: {}", e)))?; handles.stdin.write_all(b"\n") .map_err(|e| ZclawError::McpError(format!("Failed to write newline: {}", e)))?; handles.stdin.flush() .map_err(|e| ZclawError::McpError(format!("Failed to flush notification: {}", e)))?; Ok(()) } /// Send JSON-RPC request (atomic write-then-read under single lock) async fn send_request( &self, method: &str, params: Option<&impl serde::Serialize>, ) -> Result { // Build request let id = next_request_id(); let request = JsonRpcRequest { jsonrpc: "2.0", id, method: method.to_string(), params: params.map(|p| serde_json::to_value(p).unwrap_or(serde_json::Value::Null)), }; // Serialize request let line = serde_json::to_string(&request) .map_err(|e| ZclawError::McpError(format!("Failed to serialize request: {}", e)))?; // Atomic write-then-read under single lock let response_line = { let mut handles_guard = self.handles.lock().await; let handles = handles_guard.as_mut() .ok_or_else(|| ZclawError::McpError("Transport not started".to_string()))?; // Write to stdin handles.stdin.write_all(line.as_bytes()) .map_err(|e| ZclawError::McpError(format!("Failed to write request: {}", e)))?; handles.stdin.write_all(b"\n") .map_err(|e| ZclawError::McpError(format!("Failed to write newline: {}", e)))?; handles.stdin.flush() .map_err(|e| ZclawError::McpError(format!("Failed to flush request: {}", e)))?; // Read from stdout (still holding the lock — no interleaving possible) let mut response_line = String::new(); handles.stdout.read_line(&mut response_line) .map_err(|e| ZclawError::McpError(format!("Failed to read response: {}", e)))?; response_line }; // Parse response let response: JsonRpcResponse = serde_json::from_str(&response_line) .map_err(|e| ZclawError::McpError(format!("Failed to parse response: {}", e)))?; if let Some(error) = response.error { return Err(ZclawError::McpError(format!("MCP error {}: {}", error.code, error.message))); } response.result.ok_or_else(|| ZclawError::McpError("No result in response".to_string())) } /// List available tools pub async fn list_tools(&self) -> Result> { let result: ListToolsResult = self.send_request("tools/list", None::<&()>).await?; Ok(result.tools) } /// Call a tool pub async fn call_tool(&self, name: &str, arguments: serde_json::Value) -> Result { let params = serde_json::json!({ "name": name, "arguments": arguments }); let result: CallToolResult = self.send_request("tools/call", Some(¶ms)).await?; Ok(result) } /// List available resources pub async fn list_resources(&self) -> Result> { let result: ListResourcesResult = self.send_request("resources/list", None::<&()>).await?; Ok(result.resources) } /// Read a resource pub async fn read_resource(&self, uri: &str) -> Result { let params = serde_json::json!({ "uri": uri }); let result: ReadResourceResult = self.send_request("resources/read", Some(¶ms)).await?; Ok(result) } /// List available prompts pub async fn list_prompts(&self) -> Result> { let result: ListPromptsResult = self.send_request("prompts/list", None::<&()>).await?; Ok(result.prompts) } /// Get a prompt pub async fn get_prompt(&self, name: &str, arguments: Option) -> Result { let mut params = serde_json::json!({ "name": name }); if let Some(args) = arguments { params["arguments"] = args; } let result: GetPromptResult = self.send_request("prompts/get", Some(¶ms)).await?; Ok(result) } } /// Implement McpClient trait for McpTransport #[async_trait] impl McpClient for McpTransport { async fn list_tools(&self) -> Result> { let tools = McpTransport::list_tools(self).await?; Ok(tools.into_iter().map(|t| McpTool { name: t.name, description: t.description.unwrap_or_default(), input_schema: t.input_schema, }).collect()) } async fn call_tool(&self, request: McpToolCallRequest) -> Result { let args = serde_json::to_value(&request.arguments) .map_err(|e| ZclawError::McpError(format!("Failed to serialize arguments: {}", e)))?; let result = McpTransport::call_tool(self, &request.name, args).await?; Ok(McpToolCallResponse { content: result.content.into_iter().map(|c| match c { ContentBlock::Text { text } => McpContent::Text { text }, ContentBlock::Image { data, mime_type } => McpContent::Image { data, mime_type }, ContentBlock::Resource { resource } => McpContent::Resource { resource: McpResourceContent { uri: resource.uri, mime_type: resource.mime_type, text: resource.text, blob: resource.blob, } } }).collect(), is_error: result.is_error, }) } async fn list_resources(&self) -> Result> { let resources = McpTransport::list_resources(self).await?; Ok(resources.into_iter().map(|r| McpResource { uri: r.uri, name: r.name, description: r.description, mime_type: r.mime_type, }).collect()) } async fn read_resource(&self, uri: &str) -> Result { let result = McpTransport::read_resource(self, uri).await?; // Get first content item let content = result.contents.first() .ok_or_else(|| ZclawError::McpError("No resource content".to_string()))?; Ok(McpResourceContent { uri: content.uri.clone(), mime_type: content.mime_type.clone(), text: content.text.clone(), blob: content.blob.clone(), }) } async fn list_prompts(&self) -> Result> { let prompts = McpTransport::list_prompts(self).await?; Ok(prompts.into_iter().map(|p| McpPrompt { name: p.name, description: p.description.unwrap_or_default(), arguments: p.arguments.into_iter().map(|a| McpPromptArgument { name: a.name, description: a.description.unwrap_or_default(), required: a.required, }).collect(), }).collect()) } async fn get_prompt(&self, name: &str, arguments: HashMap) -> Result { let args = if arguments.is_empty() { None } else { Some(serde_json::to_value(&arguments) .map_err(|e| ZclawError::McpError(format!("Failed to serialize arguments: {}", e)))?) }; let result = McpTransport::get_prompt(self, name, args).await?; // Combine messages into a string let prompt_text: Vec = result.messages.into_iter() .filter_map(|m| match m.content { ContentBlock::Text { text } => Some(format!("{}: {}", m.role, text)), _ => None, }) .collect(); Ok(prompt_text.join("\n")) } } impl Drop for McpTransport { fn drop(&mut self) { // Try to kill the child process synchronously // We use a blocking approach here since Drop cannot be async if let Ok(mut child_guard) = self.child.try_lock() { if let Some(mut child) = child_guard.take() { // Try to kill the process gracefully match child.kill() { Ok(_) => { // Wait for the process to exit let _ = child.wait(); } Err(e) => { tracing::warn!("[McpTransport] Failed to kill child process (potential zombie): {}", e); } } } } } }