diff --git a/crates/zclaw-protocols/src/lib.rs b/crates/zclaw-protocols/src/lib.rs index d1a6704..1331221 100644 --- a/crates/zclaw-protocols/src/lib.rs +++ b/crates/zclaw-protocols/src/lib.rs @@ -3,7 +3,11 @@ //! Protocol support for MCP (Model Context Protocol) and A2A (Agent-to-Agent). mod mcp; +mod mcp_types; +mod mcp_transport; mod a2a; pub use mcp::*; +pub use mcp_types::*; +pub use mcp_transport::*; pub use a2a::*; diff --git a/crates/zclaw-protocols/src/mcp_transport.rs b/crates/zclaw-protocols/src/mcp_transport.rs new file mode 100644 index 0000000..bd70f71 --- /dev/null +++ b/crates/zclaw-protocols/src/mcp_transport.rs @@ -0,0 +1,365 @@ +//! MCP Transport Layer +//! +//! Implements stdio-based transport for MCP server communication. + +use std::collections::HashMap; +use std::io::{BufRead, BufReader, BufWriter, Write}; +use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +use async_trait::async_trait; +use serde::de::DeserializeOwned; +use tokio::sync::Mutex; + +use zclaw_types::{Result, ZclawError}; + +use crate::mcp_types::*; +use crate::{McpClient, McpContent, McpPrompt, McpPromptArgument, McpResource, McpResourceContent, McpTool, McpToolCallRequest, McpToolCallResponse}; + +/// Global request ID counter +static REQUEST_ID: AtomicU64 = AtomicU64::new(1); + +/// Generate next request ID +fn next_request_id() -> u64 { + REQUEST_ID.fetch_add(1, Ordering::SeqCst) +} + +/// MCP Server process configuration +#[derive(Debug, Clone)] +pub struct McpServerConfig { + /// Command to run (e.g., "npx", "node", "python") + pub command: String, + /// Arguments for the command + pub args: Vec, + /// Environment variables + pub env: HashMap, + /// Working directory + pub cwd: Option, +} + +impl McpServerConfig { + /// Create configuration for npx-based MCP server + pub fn npx(package: &str) -> Self { + Self { + command: "npx".to_string(), + args: vec!["-y".to_string(), package.to_string()], + env: HashMap::new(), + cwd: None, + } + } + + /// Create configuration for node-based MCP server + pub fn node(script: &str) -> Self { + Self { + command: "node".to_string(), + args: vec![script.to_string()], + env: HashMap::new(), + cwd: None, + } + } + + /// Create configuration for python-based MCP server + pub fn python(script: &str) -> Self { + Self { + command: "python".to_string(), + args: vec![script.to_string()], + env: HashMap::new(), + cwd: None, + } + } + + /// Add environment variable + pub fn env(mut self, key: impl Into, value: impl Into) -> Self { + self.env.insert(key.into(), value.into()); + self + } + + /// Set working directory + pub fn cwd(mut self, cwd: impl Into) -> Self { + self.cwd = Some(cwd.into()); + self + } +} + +/// MCP Transport using stdio +pub struct McpTransport { + config: McpServerConfig, + child: Arc>>, + stdin: Arc>>>, + stdout: Arc>>>, + capabilities: Arc>>, +} + +impl McpTransport { + /// Create new MCP transport + pub fn new(config: McpServerConfig) -> Self { + Self { + config, + child: Arc::new(Mutex::new(None)), + stdin: Arc::new(Mutex::new(None)), + stdout: Arc::new(Mutex::new(None)), + capabilities: Arc::new(Mutex::new(None)), + } + } + + /// Start the MCP server process + pub async fn start(&self) -> Result<()> { + let mut child_guard = self.child.lock().await; + + if child_guard.is_some() { + return Ok(()); // Already started + } + + // Build command + let mut cmd = Command::new(&self.config.command); + cmd.args(&self.config.args); + + // Set environment + for (key, value) in &self.config.env { + cmd.env(key, value); + } + + // Set working directory + if let Some(cwd) = &self.config.cwd { + cmd.current_dir(cwd); + } + + // Configure stdio + cmd.stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::null()); + + // Spawn process + let mut child = cmd.spawn() + .map_err(|e| ZclawError::McpError(format!("Failed to start MCP server: {}", e)))?; + + // Take stdin and stdout + let stdin = child.stdin.take() + .ok_or_else(|| ZclawError::McpError("Failed to get stdin".to_string()))?; + let stdout = child.stdout.take() + .ok_or_else(|| ZclawError::McpError("Failed to get stdout".to_string()))?; + + // Store handles in separate mutexes + *self.stdin.lock().await = Some(BufWriter::new(stdin)); + *self.stdout.lock().await = Some(BufReader::new(stdout)); + *child_guard = Some(child); + + Ok(()) + } + + /// Initialize MCP connection + pub async fn initialize(&self) -> Result<()> { + // Ensure server is started + self.start().await?; + + let request = InitializeRequest::default(); + let _: InitializeResult = self.send_request("initialize", Some(&request)).await?; + + // Store capabilities + let mut capabilities = self.capabilities.lock().await; + *capabilities = Some(ServerCapabilities::default()); + + Ok(()) + } + + /// Send JSON-RPC request + async fn send_request( + &self, + method: &str, + params: Option<&impl serde::Serialize>, + ) -> Result { + // Build request + let id = next_request_id(); + let request = JsonRpcRequest { + jsonrpc: "2.0", + id, + method: method.to_string(), + params: params.map(|p| serde_json::to_value(p).unwrap_or(serde_json::Value::Null)), + }; + + // Serialize request + let line = serde_json::to_string(&request) + .map_err(|e| ZclawError::McpError(format!("Failed to serialize request: {}", e)))?; + + // Write to stdin + { + let mut stdin_guard = self.stdin.lock().await; + let stdin = stdin_guard.as_mut() + .ok_or_else(|| ZclawError::McpError("Transport not started".to_string()))?; + + stdin.write_all(line.as_bytes()) + .map_err(|e| ZclawError::McpError(format!("Failed to write request: {}", e)))?; + stdin.write_all(b"\n") + .map_err(|e| ZclawError::McpError(format!("Failed to write newline: {}", e)))?; + stdin.flush() + .map_err(|e| ZclawError::McpError(format!("Failed to flush request: {}", e)))?; + } + + // Read from stdout + let response_line = { + let mut stdout_guard = self.stdout.lock().await; + let stdout = stdout_guard.as_mut() + .ok_or_else(|| ZclawError::McpError("Transport not started".to_string()))?; + + let mut response_line = String::new(); + stdout.read_line(&mut response_line) + .map_err(|e| ZclawError::McpError(format!("Failed to read response: {}", e)))?; + response_line + }; + + // Parse response + let response: JsonRpcResponse = serde_json::from_str(&response_line) + .map_err(|e| ZclawError::McpError(format!("Failed to parse response: {}", e)))?; + + if let Some(error) = response.error { + return Err(ZclawError::McpError(format!("MCP error {}: {}", error.code, error.message))); + } + + response.result.ok_or_else(|| ZclawError::McpError("No result in response".to_string())) + } + + /// List available tools + pub async fn list_tools(&self) -> Result> { + let result: ListToolsResult = self.send_request("tools/list", None::<&()>).await?; + Ok(result.tools) + } + + /// Call a tool + pub async fn call_tool(&self, name: &str, arguments: serde_json::Value) -> Result { + let params = serde_json::json!({ + "name": name, + "arguments": arguments + }); + let result: CallToolResult = self.send_request("tools/call", Some(¶ms)).await?; + Ok(result) + } + + /// List available resources + pub async fn list_resources(&self) -> Result> { + let result: ListResourcesResult = self.send_request("resources/list", None::<&()>).await?; + Ok(result.resources) + } + + /// Read a resource + pub async fn read_resource(&self, uri: &str) -> Result { + let params = serde_json::json!({ + "uri": uri + }); + let result: ReadResourceResult = self.send_request("resources/read", Some(¶ms)).await?; + Ok(result) + } + + /// List available prompts + pub async fn list_prompts(&self) -> Result> { + let result: ListPromptsResult = self.send_request("prompts/list", None::<&()>).await?; + Ok(result.prompts) + } + + /// Get a prompt + pub async fn get_prompt(&self, name: &str, arguments: Option) -> Result { + let mut params = serde_json::json!({ + "name": name + }); + if let Some(args) = arguments { + params["arguments"] = args; + } + let result: GetPromptResult = self.send_request("prompts/get", Some(¶ms)).await?; + Ok(result) + } +} + +/// Implement McpClient trait for McpTransport +#[async_trait] +impl McpClient for McpTransport { + async fn list_tools(&self) -> Result> { + let tools = McpTransport::list_tools(self).await?; + Ok(tools.into_iter().map(|t| McpTool { + name: t.name, + description: t.description.unwrap_or_default(), + input_schema: t.input_schema, + }).collect()) + } + + async fn call_tool(&self, request: McpToolCallRequest) -> Result { + let args = serde_json::to_value(&request.arguments) + .map_err(|e| ZclawError::McpError(format!("Failed to serialize arguments: {}", e)))?; + + let result = McpTransport::call_tool(self, &request.name, args).await?; + + Ok(McpToolCallResponse { + content: result.content.into_iter().map(|c| match c { + ContentBlock::Text { text } => McpContent::Text { text }, + ContentBlock::Image { data, mime_type } => McpContent::Image { data, mime_type }, + ContentBlock::Resource { resource } => McpContent::Resource { + resource: McpResourceContent { + uri: resource.uri, + mime_type: resource.mime_type, + text: resource.text, + blob: resource.blob, + } + } + }).collect(), + is_error: result.is_error, + }) + } + + async fn list_resources(&self) -> Result> { + let resources = McpTransport::list_resources(self).await?; + Ok(resources.into_iter().map(|r| McpResource { + uri: r.uri, + name: r.name, + description: r.description, + mime_type: r.mime_type, + }).collect()) + } + + async fn read_resource(&self, uri: &str) -> Result { + let result = McpTransport::read_resource(self, uri).await?; + + // Get first content item + let content = result.contents.first() + .ok_or_else(|| ZclawError::McpError("No resource content".to_string()))?; + + Ok(McpResourceContent { + uri: content.uri.clone(), + mime_type: content.mime_type.clone(), + text: content.text.clone(), + blob: content.blob.clone(), + }) + } + + async fn list_prompts(&self) -> Result> { + let prompts = McpTransport::list_prompts(self).await?; + Ok(prompts.into_iter().map(|p| McpPrompt { + name: p.name, + description: p.description.unwrap_or_default(), + arguments: p.arguments.into_iter().map(|a| McpPromptArgument { + name: a.name, + description: a.description.unwrap_or_default(), + required: a.required, + }).collect(), + }).collect()) + } + + async fn get_prompt(&self, name: &str, arguments: HashMap) -> Result { + let args = if arguments.is_empty() { + None + } else { + Some(serde_json::to_value(&arguments) + .map_err(|e| ZclawError::McpError(format!("Failed to serialize arguments: {}", e)))?) + }; + + let result = McpTransport::get_prompt(self, name, args).await?; + + // Combine messages into a string + let prompt_text: Vec = result.messages.into_iter() + .filter_map(|m| match m.content { + ContentBlock::Text { text } => Some(format!("{}: {}", m.role, text)), + _ => None, + }) + .collect(); + + Ok(prompt_text.join("\n")) + } +} diff --git a/crates/zclaw-protocols/src/mcp_types.rs b/crates/zclaw-protocols/src/mcp_types.rs new file mode 100644 index 0000000..ebf3cbd --- /dev/null +++ b/crates/zclaw-protocols/src/mcp_types.rs @@ -0,0 +1,334 @@ +//! MCP JSON-RPC 2.0 types +//! +//! Type definitions for Model Context Protocol communication. + +use serde::{Deserialize, Serialize}; + +// === JSON-RPC Types === + +/// JSON-RPC 2.0 Request +#[derive(Debug, Clone, Serialize)] +pub struct JsonRpcRequest { + pub jsonrpc: &'static str, + pub id: u64, + pub method: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub params: Option, +} + +impl JsonRpcRequest { + pub fn new(id: u64, method: impl Into) -> Self { + Self { + jsonrpc: "2.0", + id, + method: method.into(), + params: None, + } + } + + pub fn with_params(mut self, params: serde_json::Value) -> Self { + self.params = Some(params); + self + } +} + +/// JSON-RPC 2.0 Response (generic for typed results) +#[derive(Debug, Clone, Deserialize)] +pub struct JsonRpcResponse { + pub jsonrpc: String, + pub id: u64, + pub result: Option, + #[serde(default)] + pub error: Option, +} + +/// JSON-RPC Error +#[derive(Debug, Clone, Deserialize)] +pub struct JsonRpcError { + pub code: i32, + pub message: String, + #[serde(default)] + pub data: Option, +} + +// === MCP Initialize === + +/// MCP Initialize Request +#[derive(Debug, Clone, Serialize)] +pub struct InitializeRequest { + pub protocol_version: String, + pub capabilities: ClientCapabilities, + pub client_info: Implementation, +} + +impl Default for InitializeRequest { + fn default() -> Self { + Self { + protocol_version: "2024-11-05".to_string(), + capabilities: ClientCapabilities::default(), + client_info: Implementation { + name: "zclaw".to_string(), + version: env!("CARGO_PKG_VERSION").to_string(), + }, + } + } +} + +/// Client capabilities +#[derive(Debug, Clone, Serialize, Default)] +pub struct ClientCapabilities { + #[serde(skip_serializing_if = "Option::is_none")] + pub roots: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub sampling: Option, +} + +#[derive(Debug, Clone, Serialize)] +pub struct RootsCapability { + #[serde(default)] + pub list_changed: bool, +} + +#[derive(Debug, Clone, Serialize, Default)] +pub struct SamplingCapability {} + +/// Server capabilities (from initialize response) +#[derive(Debug, Clone, Deserialize, Default)] +pub struct ServerCapabilities { + #[serde(default)] + pub tools: Option, + #[serde(default)] + pub resources: Option, + #[serde(default)] + pub prompts: Option, + #[serde(default)] + pub logging: Option, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct ToolsCapability { + #[serde(default)] + pub list_changed: bool, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct ResourcesCapability { + #[serde(default)] + pub subscribe: bool, + #[serde(default)] + pub list_changed: bool, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct PromptsCapability { + #[serde(default)] + pub list_changed: bool, +} + +#[derive(Debug, Clone, Deserialize)] +pub struct LoggingCapability {} + +/// Implementation info +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Implementation { + pub name: String, + pub version: String, +} + +/// Initialize result +#[derive(Debug, Clone, Deserialize)] +pub struct InitializeResult { + pub protocol_version: String, + pub capabilities: ServerCapabilities, + pub server_info: Implementation, +} + +// === MCP Tools === + +/// Tool from tools/list +#[derive(Debug, Clone, Deserialize)] +pub struct Tool { + pub name: String, + #[serde(default)] + pub description: Option, + pub input_schema: serde_json::Value, +} + +/// List tools result +#[derive(Debug, Clone, Deserialize)] +pub struct ListToolsResult { + pub tools: Vec, + #[serde(default)] + pub next_cursor: Option, +} + +/// Call tool request +#[derive(Debug, Clone, Serialize)] +pub struct CallToolRequest { + pub name: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub arguments: Option, +} + +/// Call tool result +#[derive(Debug, Clone, Deserialize)] +pub struct CallToolResult { + pub content: Vec, + #[serde(default)] + pub is_error: bool, +} + +// === MCP Resources === + +/// Resource from resources/list +#[derive(Debug, Clone, Deserialize)] +pub struct Resource { + pub uri: String, + pub name: String, + #[serde(default)] + pub description: Option, + #[serde(default)] + pub mime_type: Option, +} + +/// List resources result +#[derive(Debug, Clone, Deserialize)] +pub struct ListResourcesResult { + pub resources: Vec, + #[serde(default)] + pub next_cursor: Option, +} + +/// Read resource request +#[derive(Debug, Clone, Serialize)] +pub struct ReadResourceRequest { + pub uri: String, +} + +/// Read resource result +#[derive(Debug, Clone, Deserialize)] +pub struct ReadResourceResult { + pub contents: Vec, +} + +/// Resource content +#[derive(Debug, Clone, Deserialize)] +pub struct ResourceContent { + pub uri: String, + #[serde(default)] + pub mime_type: Option, + #[serde(default)] + pub text: Option, + #[serde(default)] + pub blob: Option, +} + +// === MCP Prompts === + +/// Prompt from prompts/list +#[derive(Debug, Clone, Deserialize)] +pub struct Prompt { + pub name: String, + #[serde(default)] + pub description: Option, + #[serde(default)] + pub arguments: Vec, +} + +/// Prompt argument +#[derive(Debug, Clone, Deserialize)] +pub struct PromptArgument { + pub name: String, + #[serde(default)] + pub description: Option, + #[serde(default)] + pub required: bool, +} + +/// List prompts result +#[derive(Debug, Clone, Deserialize)] +pub struct ListPromptsResult { + pub prompts: Vec, + #[serde(default)] + pub next_cursor: Option, +} + +/// Get prompt request +#[derive(Debug, Clone, Serialize)] +pub struct GetPromptRequest { + pub name: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub arguments: Option, +} + +/// Get prompt result +#[derive(Debug, Clone, Deserialize)] +pub struct GetPromptResult { + #[serde(default)] + pub description: Option, + pub messages: Vec, +} + +/// Prompt message +#[derive(Debug, Clone, Deserialize)] +pub struct PromptMessage { + pub role: String, + pub content: ContentBlock, +} + +// === Content Blocks === + +/// Content block for tool results and messages +#[derive(Debug, Clone, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum ContentBlock { + Text { text: String }, + Image { data: String, mime_type: String }, + Resource { resource: ResourceContent }, +} + +// === Logging === + +/// Set logging level request +#[derive(Debug, Clone, Serialize)] +pub struct SetLevelRequest { + pub level: LoggingLevel, +} + +/// Logging level +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum LoggingLevel { + Debug, + Info, + Notice, + Warning, + Error, + Critical, + Alert, + Emergency, +} + +// === Notifications === + +/// Initialized notification (sent after initialize) +#[derive(Debug, Clone, Serialize)] +pub struct InitializedNotification { + pub jsonrpc: &'static str, + pub method: &'static str, +} + +impl InitializedNotification { + pub fn new() -> Self { + Self { + jsonrpc: "2.0", + method: "notifications/initialized", + } + } +} + +impl Default for InitializedNotification { + fn default() -> Self { + Self::new() + } +} diff --git a/crates/zclaw-types/src/error.rs b/crates/zclaw-types/src/error.rs index 801c0fd..84ae434 100644 --- a/crates/zclaw-types/src/error.rs +++ b/crates/zclaw-types/src/error.rs @@ -46,6 +46,12 @@ pub enum ZclawError { #[error("Internal error: {0}")] Internal(String), + + #[error("Export error: {0}")] + ExportError(String), + + #[error("MCP error: {0}")] + McpError(String), } /// Result type alias for ZCLAW operations