diff --git a/crates/zclaw-kernel/src/director.rs b/crates/zclaw-kernel/src/director.rs index 88eeef3..ae5264c 100644 --- a/crates/zclaw-kernel/src/director.rs +++ b/crates/zclaw-kernel/src/director.rs @@ -12,7 +12,7 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; -use tokio::sync::{RwLock, Mutex, mpsc}; +use tokio::sync::{RwLock, Mutex, mpsc, oneshot}; use zclaw_types::{AgentId, Result, ZclawError}; use zclaw_protocols::{A2aEnvelope, A2aMessageType, A2aRecipient, A2aRouter, A2aAgentProfile, A2aCapability}; use zclaw_runtime::{LlmDriver, CompletionRequest}; @@ -199,9 +199,9 @@ pub struct Director { director_id: AgentId, /// Optional LLM driver for intelligent scheduling llm_driver: Option>, - /// Inbox for receiving responses (stores pending request IDs and their response channels) - pending_requests: Arc>>>, - /// Receiver for incoming messages + /// Pending request response channels (request_id → oneshot sender) + pending_requests: Arc>>>, + /// Receiver for incoming messages (consumed by inbox reader task) inbox: Arc>>>, } @@ -481,13 +481,16 @@ Respond with ONLY the number (1-{}) of the agent who should speak next. No expla } /// Send message to selected agent and wait for response + /// + /// Uses oneshot channels to avoid deadlock: each call creates its own + /// response channel, and a shared inbox reader dispatches responses. pub async fn send_to_agent( &self, agent: &DirectorAgent, message: String, ) -> Result { - // Create a response channel for this request - let (_response_tx, mut _response_rx) = mpsc::channel::(1); + // Create a oneshot channel for this specific request's response + let (response_tx, response_rx) = oneshot::channel::(); let envelope = A2aEnvelope::new( self.director_id.clone(), @@ -500,50 +503,32 @@ Respond with ONLY the number (1-{}) of the agent who should speak next. No expla }), ); - // Store the request ID with its response channel + // Store the oneshot sender so the inbox reader can dispatch to it let request_id = envelope.id.clone(); { let mut pending = self.pending_requests.lock().await; - pending.insert(request_id.clone(), _response_tx); + pending.insert(request_id.clone(), response_tx); } // Send the request self.router.route(envelope).await?; - // Wait for response with timeout + // Ensure the inbox reader is running + self.ensure_inbox_reader().await; + + // Wait for response on our dedicated oneshot channel with timeout let timeout_duration = std::time::Duration::from_secs(self.config.response_timeout); - let request_id_clone = request_id.clone(); - let response = tokio::time::timeout(timeout_duration, async { - // Poll the inbox for responses - let mut inbox_guard = self.inbox.lock().await; - if let Some(ref mut rx) = *inbox_guard { - while let Some(msg) = rx.recv().await { - // Check if this is a response to our request - if msg.message_type == A2aMessageType::Response { - if let Some(ref reply_to) = msg.reply_to { - if reply_to == &request_id_clone { - // Found our response - return Some(msg); - } - } - } - // Not our response, continue waiting - // (In a real implementation, we'd re-queue non-matching messages) - } - } - None - }).await; + let response = tokio::time::timeout(timeout_duration, response_rx).await; - // Clean up pending request + // Clean up pending request (sender already consumed on success) { let mut pending = self.pending_requests.lock().await; pending.remove(&request_id); } match response { - Ok(Some(envelope)) => { - // Extract response text from payload + Ok(Ok(envelope)) => { let response_text = envelope.payload .get("response") .and_then(|v: &serde_json::Value| v.as_str()) @@ -551,7 +536,7 @@ Respond with ONLY the number (1-{}) of the agent who should speak next. No expla .to_string(); Ok(response_text) } - Ok(None) => { + Ok(Err(_)) => { Err(ZclawError::Timeout("No response received".into())) } Err(_) => { @@ -563,6 +548,44 @@ Respond with ONLY the number (1-{}) of the agent who should speak next. No expla } } + /// Ensure the inbox reader task is running. + /// The inbox reader continuously reads from the shared inbox channel + /// and dispatches each response to the correct oneshot sender. + async fn ensure_inbox_reader(&self) { + // Quick check: if inbox has already been taken, reader is running + { + let inbox = self.inbox.lock().await; + if inbox.is_none() { + return; // Reader already spawned and consumed the receiver + } + } + + // Take the receiver out (only once) + let rx = { + let mut inbox = self.inbox.lock().await; + inbox.take() + }; + + if let Some(mut rx) = rx { + let pending = self.pending_requests.clone(); + tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + // Find and dispatch to the correct oneshot sender + if msg.message_type == A2aMessageType::Response { + if let Some(ref reply_to) = msg.reply_to { + let mut pending_guard = pending.lock().await; + if let Some(sender) = pending_guard.remove(reply_to) { + // Send the response; if receiver already dropped, that's fine + let _ = sender.send(msg); + } + } + } + // Non-response messages are dropped (notifications, etc.) + } + }); + } + } + /// Broadcast message to all agents pub async fn broadcast(&self, message: String) -> Result<()> { let envelope = A2aEnvelope::new( diff --git a/crates/zclaw-pipeline/src/executor.rs b/crates/zclaw-pipeline/src/executor.rs index 0a9d105..a58a016 100644 --- a/crates/zclaw-pipeline/src/executor.rs +++ b/crates/zclaw-pipeline/src/executor.rs @@ -40,6 +40,15 @@ pub enum ExecuteError { Io(#[from] std::io::Error), } +/// Maximum completed/failed/cancelled runs to keep in memory +const MAX_COMPLETED_RUNS: usize = 100; + +/// Maximum allowed delay in milliseconds (60 seconds) +const MAX_DELAY_MS: u64 = 60_000; + +/// Default per-step timeout (5 minutes) +const DEFAULT_STEP_TIMEOUT_SECS: u64 = 300; + /// Pipeline executor pub struct PipelineExecutor { /// Action registry @@ -107,35 +116,50 @@ impl PipelineExecutor { // Create execution context let mut context = ExecutionContext::new(inputs); + // Determine per-step timeout from pipeline spec (0 means use default) + let step_timeout = if pipeline.spec.timeout_secs > 0 { + pipeline.spec.timeout_secs + } else { + DEFAULT_STEP_TIMEOUT_SECS + }; + // Execute steps - let result = self.execute_steps(pipeline, &mut context, &run_id).await; + let result = self.execute_steps(pipeline, &mut context, &run_id, step_timeout).await; // Update run state - let mut runs = self.runs.write().await; - if let Some(run) = runs.get_mut(&run_id) { - match result { - Ok(outputs) => { - run.status = RunStatus::Completed; - run.outputs = Some(serde_json::to_value(&outputs).unwrap_or(Value::Null)); - } - Err(e) => { - run.status = RunStatus::Failed; - run.error = Some(e.to_string()); + let return_value = { + let mut runs = self.runs.write().await; + if let Some(run) = runs.get_mut(&run_id) { + match result { + Ok(outputs) => { + run.status = RunStatus::Completed; + run.outputs = Some(serde_json::to_value(&outputs).unwrap_or(Value::Null)); + } + Err(e) => { + run.status = RunStatus::Failed; + run.error = Some(e.to_string()); + } } + run.ended_at = Some(Utc::now()); + Ok(run.clone()) + } else { + Err(ExecuteError::Action("执行后未找到运行记录".to_string())) } - run.ended_at = Some(Utc::now()); - return Ok(run.clone()); - } + }; - Err(ExecuteError::Action("执行后未找到运行记录".to_string())) + // Auto-cleanup old completed runs (after releasing the write lock) + self.cleanup().await; + + return_value } - /// Execute pipeline steps + /// Execute pipeline steps with per-step timeout async fn execute_steps( &self, pipeline: &Pipeline, context: &mut ExecutionContext, run_id: &str, + step_timeout_secs: u64, ) -> Result, ExecuteError> { let total_steps = pipeline.spec.steps.len(); @@ -161,8 +185,15 @@ impl PipelineExecutor { tracing::info!("Executing step {} ({}/{})", step.id, idx + 1, total_steps); - // Execute action - let result = self.execute_action(&step.action, context).await?; + // Execute action with per-step timeout + let timeout_duration = std::time::Duration::from_secs(step_timeout_secs); + let result = tokio::time::timeout( + timeout_duration, + self.execute_action(&step.action, context), + ).await.map_err(|_| { + tracing::error!("Step {} timed out after {}s", step.id, step_timeout_secs); + ExecuteError::Timeout + })??; // Store result context.set_output(&step.id, result.clone()); @@ -336,7 +367,16 @@ impl PipelineExecutor { } Action::Delay { ms } => { - tokio::time::sleep(tokio::time::Duration::from_millis(*ms)).await; + let capped_ms = if *ms > MAX_DELAY_MS { + tracing::warn!( + "Delay ms {} exceeds max {}, capping to {}", + ms, MAX_DELAY_MS, MAX_DELAY_MS + ); + MAX_DELAY_MS + } else { + *ms + }; + tokio::time::sleep(tokio::time::Duration::from_millis(capped_ms)).await; Ok(Value::Null) } @@ -508,6 +548,33 @@ impl PipelineExecutor { pub async fn list_runs(&self) -> Vec { self.runs.read().await.values().cloned().collect() } + + /// Clean up old completed/failed/cancelled runs to prevent memory leaks. + /// Keeps at most MAX_COMPLETED_RUNS finished runs, evicting the oldest first. + pub async fn cleanup(&self) { + let mut runs = self.runs.write().await; + + // Collect IDs of finished runs (completed, failed, cancelled) + let mut finished: Vec<(String, chrono::DateTime)> = runs + .iter() + .filter(|(_, r)| matches!(r.status, RunStatus::Completed | RunStatus::Failed | RunStatus::Cancelled)) + .map(|(id, r)| (id.clone(), r.ended_at.unwrap_or(r.started_at))) + .collect(); + + let to_remove = finished.len().saturating_sub(MAX_COMPLETED_RUNS); + if to_remove > 0 { + // Sort by end time ascending (oldest first) + finished.sort_by_key(|(_, t)| *t); + for (id, _) in finished.into_iter().take(to_remove) { + runs.remove(&id); + // Also clean up cancellation flag + drop(runs); + self.cancellations.write().await.remove(&id); + runs = self.runs.write().await; + } + tracing::debug!("Cleaned up {} old pipeline runs", to_remove); + } + } } #[cfg(test)] diff --git a/crates/zclaw-saas/src/workers/cleanup_rate_limit.rs b/crates/zclaw-saas/src/workers/cleanup_rate_limit.rs index 9a600fd..cedd724 100644 --- a/crates/zclaw-saas/src/workers/cleanup_rate_limit.rs +++ b/crates/zclaw-saas/src/workers/cleanup_rate_limit.rs @@ -1,4 +1,7 @@ //! 清理过期 Rate Limit 条目 Worker +//! +//! rate_limit_events 表中的持久化条目会无限增长。 +//! 此 Worker 定期删除超过 1 小时的旧条目,防止数据库膨胀。 use async_trait::async_trait; use sqlx::PgPool; @@ -21,10 +24,31 @@ impl Worker for CleanupRateLimitWorker { "cleanup_rate_limit" } - async fn perform(&self, _db: &PgPool, _args: Self::Args) -> SaasResult<()> { - // Rate limit entries are in-memory (DashMap), not in DB - // This worker is a placeholder for when rate limits are persisted - // Currently the cleanup happens in main.rs background task + async fn perform(&self, db: &PgPool, args: Self::Args) -> SaasResult<()> { + let retention_secs = args.window_secs.max(3600); // 至少保留 1 小时 + + let result = sqlx::query( + "DELETE FROM rate_limit_events WHERE created_at < NOW() - ($1 || ' seconds')::interval" + ) + .bind(retention_secs.to_string()) + .execute(db) + .await; + + match result { + Ok(r) => { + let deleted = r.rows_affected(); + if deleted > 0 { + tracing::info!( + "[cleanup_rate_limit] Deleted {} expired rate limit events (retention: {}s)", + deleted, retention_secs + ); + } + } + Err(e) => { + tracing::error!("[cleanup_rate_limit] Failed to clean up rate limit events: {}", e); + } + } + Ok(()) } }