Files
zclaw_openfang/crates/zclaw-pipeline/src/actions/mod.rs
iven e3b93ff96d fix(security): implement all 15 security fixes from penetration test V1
Security audit (2026-03-31): 5 HIGH + 10 MEDIUM issues, all fixed.

HIGH:
- H1: JWT password_version mechanism (pwv in Claims, middleware verification,
  auto-increment on password change)
- H2: Docker saas port bound to 127.0.0.1
- H3: TOTP encryption key decoupled from JWT secret (production bailout)
- H4+H5: Tauri CSP hardened (removed unsafe-inline, restricted connect-src)

MEDIUM:
- M1: Persistent rate limiting (PostgreSQL rate_limit_events table)
- M2: Account lockout (5 failures -> 15min lock)
- M3: RFC 5322 email validation with regex
- M4: Device registration typed struct with length limits
- M5: Provider URL validation on create/update (SSRF prevention)
- M6: Legacy TOTP secret migration (fixed nonce -> random nonce)
- M7: Legacy frontend crypto migration (static salt -> random salt)
- M8+M9: Admin frontend: removed JS token storage, HttpOnly cookie only
- M10: Pipeline debug log sanitization (keys only, 100-char truncation)

Also: fixed CLAUDE.md Section 12 (was corrupted), added title.rs middleware
skeleton, fixed RegisterDeviceRequest visibility.
2026-04-01 08:38:37 +08:00

442 lines
13 KiB
Rust

//! Pipeline actions module
//!
//! Built-in actions that can be used in pipelines.
mod llm;
mod parallel;
mod render;
mod export;
mod http;
mod orchestration;
pub use llm::*;
pub use parallel::*;
pub use render::*;
pub use export::*;
pub use http::*;
pub use orchestration::*;
use std::collections::HashMap;
use std::sync::Arc;
use serde_json::Value;
use async_trait::async_trait;
use crate::types::ExportFormat;
/// Action execution error
#[derive(Debug, thiserror::Error)]
pub enum ActionError {
#[error("LLM error: {0}")]
Llm(String),
#[error("Skill error: {0}")]
Skill(String),
#[error("Hand error: {0}")]
Hand(String),
#[error("Render error: {0}")]
Render(String),
#[error("Export error: {0}")]
Export(String),
#[error("HTTP error: {0}")]
Http(String),
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("JSON error: {0}")]
Json(#[from] serde_json::Error),
#[error("Template not found: {0}")]
TemplateNotFound(String),
#[error("Invalid input: {0}")]
InvalidInput(String),
#[error("Orchestration error: {0}")]
Orchestration(String),
}
/// Action registry - holds references to all action executors
pub struct ActionRegistry {
/// LLM driver (injected from runtime)
llm_driver: Option<Arc<dyn LlmActionDriver>>,
/// Skill registry (injected from kernel)
skill_registry: Option<Arc<dyn SkillActionDriver>>,
/// Hand registry (injected from kernel)
hand_registry: Option<Arc<dyn HandActionDriver>>,
/// Orchestration driver (injected from kernel)
orchestration_driver: Option<Arc<dyn OrchestrationActionDriver>>,
/// Template directory
template_dir: Option<std::path::PathBuf>,
}
impl ActionRegistry {
/// Create a new action registry
pub fn new() -> Self {
Self {
llm_driver: None,
skill_registry: None,
hand_registry: None,
orchestration_driver: None,
template_dir: None,
}
}
/// Set LLM driver
pub fn with_llm_driver(mut self, driver: Arc<dyn LlmActionDriver>) -> Self {
self.llm_driver = Some(driver);
self
}
/// Set skill registry
pub fn with_skill_registry(mut self, registry: Arc<dyn SkillActionDriver>) -> Self {
self.skill_registry = Some(registry);
self
}
/// Set hand registry
pub fn with_hand_registry(mut self, registry: Arc<dyn HandActionDriver>) -> Self {
self.hand_registry = Some(registry);
self
}
/// Set orchestration driver
pub fn with_orchestration_driver(mut self, driver: Arc<dyn OrchestrationActionDriver>) -> Self {
self.orchestration_driver = Some(driver);
self
}
/// Set template directory
pub fn with_template_dir(mut self, dir: std::path::PathBuf) -> Self {
self.template_dir = Some(dir);
self
}
/// Execute LLM generation
pub async fn execute_llm(
&self,
template: &str,
input: HashMap<String, Value>,
model: Option<String>,
temperature: Option<f32>,
max_tokens: Option<u32>,
json_mode: bool,
) -> Result<Value, ActionError> {
tracing::debug!(target: "pipeline_actions", "execute_llm: Called with template length: {}", template.len());
tracing::debug!(target: "pipeline_actions", "execute_llm: input keys ({}): {:?}", input.len(), input.keys().collect::<Vec<_>>());
if let Some(driver) = &self.llm_driver {
// Load template if it's a file path
let prompt = if template.ends_with(".md") || template.contains('/') {
self.load_template(template)?
} else {
template.to_string()
};
tracing::debug!(target: "pipeline_actions", "execute_llm: Calling driver.generate with prompt length: {}", prompt.len());
driver.generate(prompt, input, model, temperature, max_tokens, json_mode)
.await
.map_err(ActionError::Llm)
} else {
Err(ActionError::Llm("LLM 驱动未配置,请在设置中配置模型与 API".to_string()))
}
}
/// Execute a skill
pub async fn execute_skill(
&self,
skill_id: &str,
input: HashMap<String, Value>,
) -> Result<Value, ActionError> {
if let Some(registry) = &self.skill_registry {
registry.execute(skill_id, input)
.await
.map_err(ActionError::Skill)
} else {
Err(ActionError::Skill("技能注册表未初始化".to_string()))
}
}
/// Execute a hand action
pub async fn execute_hand(
&self,
hand_id: &str,
action: &str,
params: HashMap<String, Value>,
) -> Result<Value, ActionError> {
if let Some(registry) = &self.hand_registry {
registry.execute(hand_id, action, params)
.await
.map_err(ActionError::Hand)
} else {
Err(ActionError::Hand("Hand 注册表未初始化".to_string()))
}
}
/// Execute a skill orchestration
pub async fn execute_orchestration(
&self,
graph_id: Option<&str>,
graph: Option<&Value>,
input: HashMap<String, Value>,
) -> Result<Value, ActionError> {
if let Some(driver) = &self.orchestration_driver {
driver.execute(graph_id, graph, input)
.await
.map_err(ActionError::Orchestration)
} else {
Err(ActionError::Orchestration("编排驱动未初始化".to_string()))
}
}
/// Render classroom
pub async fn render_classroom(&self, data: &Value) -> Result<Value, ActionError> {
// This will integrate with the classroom renderer
// For now, return the data as-is
Ok(data.clone())
}
/// Export files
pub async fn export_files(
&self,
formats: &[ExportFormat],
data: &Value,
output_dir: Option<&str>,
) -> Result<Value, ActionError> {
let mut paths = Vec::new();
let dir = output_dir
.map(std::path::PathBuf::from)
.unwrap_or_else(|| std::env::temp_dir());
for format in formats {
let path = self.export_single(format, data, &dir).await?;
paths.push(path);
}
Ok(serde_json::to_value(paths).unwrap_or(Value::Null))
}
async fn export_single(
&self,
format: &ExportFormat,
data: &Value,
dir: &std::path::Path,
) -> Result<String, ActionError> {
let filename = format!("output_{}.{}", chrono::Utc::now().format("%Y%m%d_%H%M%S"), format.extension());
let path = dir.join(&filename);
match format {
ExportFormat::Json => {
let content = serde_json::to_string_pretty(data)?;
tokio::fs::write(&path, content).await?;
}
ExportFormat::Markdown => {
let content = self.render_markdown(data)?;
tokio::fs::write(&path, content).await?;
}
ExportFormat::Html => {
let content = self.render_html(data)?;
tokio::fs::write(&path, content).await?;
}
ExportFormat::Pptx => {
return Err(ActionError::Export(
"PPTX 导出暂不可用。桌面端可通过 Pipeline 结果面板使用 JSON 格式导出后转换。".to_string(),
));
}
ExportFormat::Pdf => {
return Err(ActionError::Export(
"PDF 导出暂不可用。桌面端可通过 Pipeline 结果面板使用 HTML 格式导出后通过浏览器打印为 PDF。".to_string(),
));
}
}
Ok(path.to_string_lossy().to_string())
}
/// Make HTTP request
pub async fn http_request(
&self,
url: &str,
method: &str,
headers: &HashMap<String, String>,
body: Option<&Value>,
) -> Result<Value, ActionError> {
let client = reqwest::Client::new();
let mut request = match method.to_uppercase().as_str() {
"GET" => client.get(url),
"POST" => client.post(url),
"PUT" => client.put(url),
"DELETE" => client.delete(url),
"PATCH" => client.patch(url),
_ => return Err(ActionError::Http(format!("Unsupported HTTP method: {}", method))),
};
for (key, value) in headers {
request = request.header(key, value);
}
if let Some(body) = body {
request = request.json(body);
}
let response = request.send()
.await
.map_err(|e| ActionError::Http(e.to_string()))?;
let status = response.status();
let body = response.text()
.await
.map_err(|e| ActionError::Http(e.to_string()))?;
Ok(serde_json::json!({
"status": status.as_u16(),
"body": body,
}))
}
/// Load a template file
fn load_template(&self, path: &str) -> Result<String, ActionError> {
let template_path = if let Some(dir) = &self.template_dir {
dir.join(path)
} else {
std::path::PathBuf::from(path)
};
std::fs::read_to_string(&template_path)
.map_err(|_| ActionError::TemplateNotFound(path.to_string()))
}
/// Render data to markdown
fn render_markdown(&self, data: &Value) -> Result<String, ActionError> {
// Simple markdown rendering
let mut md = String::new();
if let Some(title) = data.get("title").and_then(|v| v.as_str()) {
md.push_str(&format!("# {}\n\n", title));
}
if let Some(items) = data.get("items").and_then(|v| v.as_array()) {
for item in items {
if let Some(text) = item.as_str() {
md.push_str(&format!("- {}\n", text));
}
}
}
Ok(md)
}
/// Render data to HTML
fn render_html(&self, data: &Value) -> Result<String, ActionError> {
let mut html = String::from("<!DOCTYPE html><html><head><meta charset=\"utf-8\"><title>Export</title></head><body>");
if let Some(title) = data.get("title").and_then(|v| v.as_str()) {
html.push_str(&format!("<h1>{}</h1>", escape_html(title)));
}
if let Some(items) = data.get("items").and_then(|v| v.as_array()) {
html.push_str("<ul>");
for item in items {
if let Some(text) = item.as_str() {
html.push_str(&format!("<li>{}</li>", escape_html(text)));
}
}
html.push_str("</ul>");
}
html.push_str("</body></html>");
Ok(html)
}
}
/// Escape HTML special characters to prevent XSS
fn escape_html(s: &str) -> String {
let mut escaped = String::with_capacity(s.len());
for ch in s.chars() {
match ch {
'<' => escaped.push_str("&lt;"),
'>' => escaped.push_str("&gt;"),
'&' => escaped.push_str("&amp;"),
'"' => escaped.push_str("&quot;"),
'\'' => escaped.push_str("&#39;"),
_ => escaped.push(ch),
}
}
escaped
}
impl ExportFormat {
fn extension(&self) -> &'static str {
match self {
ExportFormat::Pptx => "pptx",
ExportFormat::Html => "html",
ExportFormat::Pdf => "pdf",
ExportFormat::Markdown => "md",
ExportFormat::Json => "json",
}
}
}
impl Default for ActionRegistry {
fn default() -> Self {
Self::new()
}
}
/// LLM action driver trait
#[async_trait]
pub trait LlmActionDriver: Send + Sync {
async fn generate(
&self,
prompt: String,
input: HashMap<String, Value>,
model: Option<String>,
temperature: Option<f32>,
max_tokens: Option<u32>,
json_mode: bool,
) -> Result<Value, String>;
}
/// Skill action driver trait
#[async_trait]
pub trait SkillActionDriver: Send + Sync {
async fn execute(
&self,
skill_id: &str,
input: HashMap<String, Value>,
) -> Result<Value, String>;
}
/// Hand action driver trait
#[async_trait]
pub trait HandActionDriver: Send + Sync {
async fn execute(
&self,
hand_id: &str,
action: &str,
params: HashMap<String, Value>,
) -> Result<Value, String>;
}
/// Orchestration action driver trait
#[async_trait]
pub trait OrchestrationActionDriver: Send + Sync {
async fn execute(
&self,
graph_id: Option<&str>,
graph: Option<&Value>,
input: HashMap<String, Value>,
) -> Result<Value, String>;
}