fix: 发布前审计 Batch 1 — Pipeline 内存泄漏/超时 + Director 死锁 + Rate Limit Worker
Pipeline executor: - 添加 cleanup() 方法,MAX_COMPLETED_RUNS=100 上限淘汰旧记录 - 每步执行添加 tokio::time::timeout(使用 PipelineSpec.timeout_secs,默认 300s) - Delay ms 上限 60000,超出 warn 并截断 Director send_to_agent: - 重构为 oneshot::channel 响应模式,避免 inbox + pending_requests 锁竞争 - 添加 ensure_inbox_reader() 独立任务分发响应到对应 oneshot sender cleanup_rate_limit Worker: - 实现 Worker body: DELETE FROM rate_limit_events WHERE created_at < NOW() - INTERVAL '1 hour' 651 tests passed, 0 failed
This commit is contained in:
@@ -12,7 +12,7 @@
|
|||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::sync::{RwLock, Mutex, mpsc};
|
use tokio::sync::{RwLock, Mutex, mpsc, oneshot};
|
||||||
use zclaw_types::{AgentId, Result, ZclawError};
|
use zclaw_types::{AgentId, Result, ZclawError};
|
||||||
use zclaw_protocols::{A2aEnvelope, A2aMessageType, A2aRecipient, A2aRouter, A2aAgentProfile, A2aCapability};
|
use zclaw_protocols::{A2aEnvelope, A2aMessageType, A2aRecipient, A2aRouter, A2aAgentProfile, A2aCapability};
|
||||||
use zclaw_runtime::{LlmDriver, CompletionRequest};
|
use zclaw_runtime::{LlmDriver, CompletionRequest};
|
||||||
@@ -199,9 +199,9 @@ pub struct Director {
|
|||||||
director_id: AgentId,
|
director_id: AgentId,
|
||||||
/// Optional LLM driver for intelligent scheduling
|
/// Optional LLM driver for intelligent scheduling
|
||||||
llm_driver: Option<Arc<dyn LlmDriver>>,
|
llm_driver: Option<Arc<dyn LlmDriver>>,
|
||||||
/// Inbox for receiving responses (stores pending request IDs and their response channels)
|
/// Pending request response channels (request_id → oneshot sender)
|
||||||
pending_requests: Arc<Mutex<std::collections::HashMap<String, mpsc::Sender<A2aEnvelope>>>>,
|
pending_requests: Arc<Mutex<std::collections::HashMap<String, oneshot::Sender<A2aEnvelope>>>>,
|
||||||
/// Receiver for incoming messages
|
/// Receiver for incoming messages (consumed by inbox reader task)
|
||||||
inbox: Arc<Mutex<Option<mpsc::Receiver<A2aEnvelope>>>>,
|
inbox: Arc<Mutex<Option<mpsc::Receiver<A2aEnvelope>>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -481,13 +481,16 @@ Respond with ONLY the number (1-{}) of the agent who should speak next. No expla
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Send message to selected agent and wait for response
|
/// Send message to selected agent and wait for response
|
||||||
|
///
|
||||||
|
/// Uses oneshot channels to avoid deadlock: each call creates its own
|
||||||
|
/// response channel, and a shared inbox reader dispatches responses.
|
||||||
pub async fn send_to_agent(
|
pub async fn send_to_agent(
|
||||||
&self,
|
&self,
|
||||||
agent: &DirectorAgent,
|
agent: &DirectorAgent,
|
||||||
message: String,
|
message: String,
|
||||||
) -> Result<String> {
|
) -> Result<String> {
|
||||||
// Create a response channel for this request
|
// Create a oneshot channel for this specific request's response
|
||||||
let (_response_tx, mut _response_rx) = mpsc::channel::<A2aEnvelope>(1);
|
let (response_tx, response_rx) = oneshot::channel::<A2aEnvelope>();
|
||||||
|
|
||||||
let envelope = A2aEnvelope::new(
|
let envelope = A2aEnvelope::new(
|
||||||
self.director_id.clone(),
|
self.director_id.clone(),
|
||||||
@@ -500,50 +503,32 @@ Respond with ONLY the number (1-{}) of the agent who should speak next. No expla
|
|||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
|
|
||||||
// Store the request ID with its response channel
|
// Store the oneshot sender so the inbox reader can dispatch to it
|
||||||
let request_id = envelope.id.clone();
|
let request_id = envelope.id.clone();
|
||||||
{
|
{
|
||||||
let mut pending = self.pending_requests.lock().await;
|
let mut pending = self.pending_requests.lock().await;
|
||||||
pending.insert(request_id.clone(), _response_tx);
|
pending.insert(request_id.clone(), response_tx);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the request
|
// Send the request
|
||||||
self.router.route(envelope).await?;
|
self.router.route(envelope).await?;
|
||||||
|
|
||||||
// Wait for response with timeout
|
// Ensure the inbox reader is running
|
||||||
|
self.ensure_inbox_reader().await;
|
||||||
|
|
||||||
|
// Wait for response on our dedicated oneshot channel with timeout
|
||||||
let timeout_duration = std::time::Duration::from_secs(self.config.response_timeout);
|
let timeout_duration = std::time::Duration::from_secs(self.config.response_timeout);
|
||||||
let request_id_clone = request_id.clone();
|
|
||||||
|
|
||||||
let response = tokio::time::timeout(timeout_duration, async {
|
let response = tokio::time::timeout(timeout_duration, response_rx).await;
|
||||||
// Poll the inbox for responses
|
|
||||||
let mut inbox_guard = self.inbox.lock().await;
|
|
||||||
if let Some(ref mut rx) = *inbox_guard {
|
|
||||||
while let Some(msg) = rx.recv().await {
|
|
||||||
// Check if this is a response to our request
|
|
||||||
if msg.message_type == A2aMessageType::Response {
|
|
||||||
if let Some(ref reply_to) = msg.reply_to {
|
|
||||||
if reply_to == &request_id_clone {
|
|
||||||
// Found our response
|
|
||||||
return Some(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Not our response, continue waiting
|
|
||||||
// (In a real implementation, we'd re-queue non-matching messages)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}).await;
|
|
||||||
|
|
||||||
// Clean up pending request
|
// Clean up pending request (sender already consumed on success)
|
||||||
{
|
{
|
||||||
let mut pending = self.pending_requests.lock().await;
|
let mut pending = self.pending_requests.lock().await;
|
||||||
pending.remove(&request_id);
|
pending.remove(&request_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
match response {
|
match response {
|
||||||
Ok(Some(envelope)) => {
|
Ok(Ok(envelope)) => {
|
||||||
// Extract response text from payload
|
|
||||||
let response_text = envelope.payload
|
let response_text = envelope.payload
|
||||||
.get("response")
|
.get("response")
|
||||||
.and_then(|v: &serde_json::Value| v.as_str())
|
.and_then(|v: &serde_json::Value| v.as_str())
|
||||||
@@ -551,7 +536,7 @@ Respond with ONLY the number (1-{}) of the agent who should speak next. No expla
|
|||||||
.to_string();
|
.to_string();
|
||||||
Ok(response_text)
|
Ok(response_text)
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(Err(_)) => {
|
||||||
Err(ZclawError::Timeout("No response received".into()))
|
Err(ZclawError::Timeout("No response received".into()))
|
||||||
}
|
}
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
@@ -563,6 +548,44 @@ Respond with ONLY the number (1-{}) of the agent who should speak next. No expla
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Ensure the inbox reader task is running.
|
||||||
|
/// The inbox reader continuously reads from the shared inbox channel
|
||||||
|
/// and dispatches each response to the correct oneshot sender.
|
||||||
|
async fn ensure_inbox_reader(&self) {
|
||||||
|
// Quick check: if inbox has already been taken, reader is running
|
||||||
|
{
|
||||||
|
let inbox = self.inbox.lock().await;
|
||||||
|
if inbox.is_none() {
|
||||||
|
return; // Reader already spawned and consumed the receiver
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Take the receiver out (only once)
|
||||||
|
let rx = {
|
||||||
|
let mut inbox = self.inbox.lock().await;
|
||||||
|
inbox.take()
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(mut rx) = rx {
|
||||||
|
let pending = self.pending_requests.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Some(msg) = rx.recv().await {
|
||||||
|
// Find and dispatch to the correct oneshot sender
|
||||||
|
if msg.message_type == A2aMessageType::Response {
|
||||||
|
if let Some(ref reply_to) = msg.reply_to {
|
||||||
|
let mut pending_guard = pending.lock().await;
|
||||||
|
if let Some(sender) = pending_guard.remove(reply_to) {
|
||||||
|
// Send the response; if receiver already dropped, that's fine
|
||||||
|
let _ = sender.send(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Non-response messages are dropped (notifications, etc.)
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Broadcast message to all agents
|
/// Broadcast message to all agents
|
||||||
pub async fn broadcast(&self, message: String) -> Result<()> {
|
pub async fn broadcast(&self, message: String) -> Result<()> {
|
||||||
let envelope = A2aEnvelope::new(
|
let envelope = A2aEnvelope::new(
|
||||||
|
|||||||
@@ -40,6 +40,15 @@ pub enum ExecuteError {
|
|||||||
Io(#[from] std::io::Error),
|
Io(#[from] std::io::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Maximum completed/failed/cancelled runs to keep in memory
|
||||||
|
const MAX_COMPLETED_RUNS: usize = 100;
|
||||||
|
|
||||||
|
/// Maximum allowed delay in milliseconds (60 seconds)
|
||||||
|
const MAX_DELAY_MS: u64 = 60_000;
|
||||||
|
|
||||||
|
/// Default per-step timeout (5 minutes)
|
||||||
|
const DEFAULT_STEP_TIMEOUT_SECS: u64 = 300;
|
||||||
|
|
||||||
/// Pipeline executor
|
/// Pipeline executor
|
||||||
pub struct PipelineExecutor {
|
pub struct PipelineExecutor {
|
||||||
/// Action registry
|
/// Action registry
|
||||||
@@ -107,10 +116,18 @@ impl PipelineExecutor {
|
|||||||
// Create execution context
|
// Create execution context
|
||||||
let mut context = ExecutionContext::new(inputs);
|
let mut context = ExecutionContext::new(inputs);
|
||||||
|
|
||||||
|
// Determine per-step timeout from pipeline spec (0 means use default)
|
||||||
|
let step_timeout = if pipeline.spec.timeout_secs > 0 {
|
||||||
|
pipeline.spec.timeout_secs
|
||||||
|
} else {
|
||||||
|
DEFAULT_STEP_TIMEOUT_SECS
|
||||||
|
};
|
||||||
|
|
||||||
// Execute steps
|
// Execute steps
|
||||||
let result = self.execute_steps(pipeline, &mut context, &run_id).await;
|
let result = self.execute_steps(pipeline, &mut context, &run_id, step_timeout).await;
|
||||||
|
|
||||||
// Update run state
|
// Update run state
|
||||||
|
let return_value = {
|
||||||
let mut runs = self.runs.write().await;
|
let mut runs = self.runs.write().await;
|
||||||
if let Some(run) = runs.get_mut(&run_id) {
|
if let Some(run) = runs.get_mut(&run_id) {
|
||||||
match result {
|
match result {
|
||||||
@@ -124,18 +141,25 @@ impl PipelineExecutor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
run.ended_at = Some(Utc::now());
|
run.ended_at = Some(Utc::now());
|
||||||
return Ok(run.clone());
|
Ok(run.clone())
|
||||||
}
|
} else {
|
||||||
|
|
||||||
Err(ExecuteError::Action("执行后未找到运行记录".to_string()))
|
Err(ExecuteError::Action("执行后未找到运行记录".to_string()))
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/// Execute pipeline steps
|
// Auto-cleanup old completed runs (after releasing the write lock)
|
||||||
|
self.cleanup().await;
|
||||||
|
|
||||||
|
return_value
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Execute pipeline steps with per-step timeout
|
||||||
async fn execute_steps(
|
async fn execute_steps(
|
||||||
&self,
|
&self,
|
||||||
pipeline: &Pipeline,
|
pipeline: &Pipeline,
|
||||||
context: &mut ExecutionContext,
|
context: &mut ExecutionContext,
|
||||||
run_id: &str,
|
run_id: &str,
|
||||||
|
step_timeout_secs: u64,
|
||||||
) -> Result<HashMap<String, Value>, ExecuteError> {
|
) -> Result<HashMap<String, Value>, ExecuteError> {
|
||||||
let total_steps = pipeline.spec.steps.len();
|
let total_steps = pipeline.spec.steps.len();
|
||||||
|
|
||||||
@@ -161,8 +185,15 @@ impl PipelineExecutor {
|
|||||||
|
|
||||||
tracing::info!("Executing step {} ({}/{})", step.id, idx + 1, total_steps);
|
tracing::info!("Executing step {} ({}/{})", step.id, idx + 1, total_steps);
|
||||||
|
|
||||||
// Execute action
|
// Execute action with per-step timeout
|
||||||
let result = self.execute_action(&step.action, context).await?;
|
let timeout_duration = std::time::Duration::from_secs(step_timeout_secs);
|
||||||
|
let result = tokio::time::timeout(
|
||||||
|
timeout_duration,
|
||||||
|
self.execute_action(&step.action, context),
|
||||||
|
).await.map_err(|_| {
|
||||||
|
tracing::error!("Step {} timed out after {}s", step.id, step_timeout_secs);
|
||||||
|
ExecuteError::Timeout
|
||||||
|
})??;
|
||||||
|
|
||||||
// Store result
|
// Store result
|
||||||
context.set_output(&step.id, result.clone());
|
context.set_output(&step.id, result.clone());
|
||||||
@@ -336,7 +367,16 @@ impl PipelineExecutor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Action::Delay { ms } => {
|
Action::Delay { ms } => {
|
||||||
tokio::time::sleep(tokio::time::Duration::from_millis(*ms)).await;
|
let capped_ms = if *ms > MAX_DELAY_MS {
|
||||||
|
tracing::warn!(
|
||||||
|
"Delay ms {} exceeds max {}, capping to {}",
|
||||||
|
ms, MAX_DELAY_MS, MAX_DELAY_MS
|
||||||
|
);
|
||||||
|
MAX_DELAY_MS
|
||||||
|
} else {
|
||||||
|
*ms
|
||||||
|
};
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_millis(capped_ms)).await;
|
||||||
Ok(Value::Null)
|
Ok(Value::Null)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -508,6 +548,33 @@ impl PipelineExecutor {
|
|||||||
pub async fn list_runs(&self) -> Vec<PipelineRun> {
|
pub async fn list_runs(&self) -> Vec<PipelineRun> {
|
||||||
self.runs.read().await.values().cloned().collect()
|
self.runs.read().await.values().cloned().collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Clean up old completed/failed/cancelled runs to prevent memory leaks.
|
||||||
|
/// Keeps at most MAX_COMPLETED_RUNS finished runs, evicting the oldest first.
|
||||||
|
pub async fn cleanup(&self) {
|
||||||
|
let mut runs = self.runs.write().await;
|
||||||
|
|
||||||
|
// Collect IDs of finished runs (completed, failed, cancelled)
|
||||||
|
let mut finished: Vec<(String, chrono::DateTime<Utc>)> = runs
|
||||||
|
.iter()
|
||||||
|
.filter(|(_, r)| matches!(r.status, RunStatus::Completed | RunStatus::Failed | RunStatus::Cancelled))
|
||||||
|
.map(|(id, r)| (id.clone(), r.ended_at.unwrap_or(r.started_at)))
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
let to_remove = finished.len().saturating_sub(MAX_COMPLETED_RUNS);
|
||||||
|
if to_remove > 0 {
|
||||||
|
// Sort by end time ascending (oldest first)
|
||||||
|
finished.sort_by_key(|(_, t)| *t);
|
||||||
|
for (id, _) in finished.into_iter().take(to_remove) {
|
||||||
|
runs.remove(&id);
|
||||||
|
// Also clean up cancellation flag
|
||||||
|
drop(runs);
|
||||||
|
self.cancellations.write().await.remove(&id);
|
||||||
|
runs = self.runs.write().await;
|
||||||
|
}
|
||||||
|
tracing::debug!("Cleaned up {} old pipeline runs", to_remove);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -1,4 +1,7 @@
|
|||||||
//! 清理过期 Rate Limit 条目 Worker
|
//! 清理过期 Rate Limit 条目 Worker
|
||||||
|
//!
|
||||||
|
//! rate_limit_events 表中的持久化条目会无限增长。
|
||||||
|
//! 此 Worker 定期删除超过 1 小时的旧条目,防止数据库膨胀。
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use sqlx::PgPool;
|
use sqlx::PgPool;
|
||||||
@@ -21,10 +24,31 @@ impl Worker for CleanupRateLimitWorker {
|
|||||||
"cleanup_rate_limit"
|
"cleanup_rate_limit"
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn perform(&self, _db: &PgPool, _args: Self::Args) -> SaasResult<()> {
|
async fn perform(&self, db: &PgPool, args: Self::Args) -> SaasResult<()> {
|
||||||
// Rate limit entries are in-memory (DashMap), not in DB
|
let retention_secs = args.window_secs.max(3600); // 至少保留 1 小时
|
||||||
// This worker is a placeholder for when rate limits are persisted
|
|
||||||
// Currently the cleanup happens in main.rs background task
|
let result = sqlx::query(
|
||||||
|
"DELETE FROM rate_limit_events WHERE created_at < NOW() - ($1 || ' seconds')::interval"
|
||||||
|
)
|
||||||
|
.bind(retention_secs.to_string())
|
||||||
|
.execute(db)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match result {
|
||||||
|
Ok(r) => {
|
||||||
|
let deleted = r.rows_affected();
|
||||||
|
if deleted > 0 {
|
||||||
|
tracing::info!(
|
||||||
|
"[cleanup_rate_limit] Deleted {} expired rate limit events (retention: {}s)",
|
||||||
|
deleted, retention_secs
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("[cleanup_rate_limit] Failed to clean up rate limit events: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user