Files
zclaw_openfang/crates/zclaw-runtime/src/tool/builtin/path_validator.rs
iven 5fdf96c3f5 chore: 提交所有工作进度 — SaaS 后端增强、Admin UI、桌面端集成
包含大量 SaaS 平台改进、Admin 管理后台更新、桌面端集成完善、
文档同步、测试文件重构等内容。为 QA 测试准备干净工作树。
2026-03-29 10:46:41 +08:00

467 lines
15 KiB
Rust

//! Path validation for file system tools
//!
//! Provides security validation for file paths to prevent:
//! - Path traversal attacks (../)
//! - Access to blocked system directories
//! - Access outside allowed workspace directories
//!
//! # Security Policy (Default Deny)
//!
//! This validator follows a **default deny** security policy:
//! - If no `allowed_paths` are configured AND no `workspace_root` is set,
//! all path access is denied by default
//! - This prevents accidental exposure of sensitive files when the validator
//! is used without proper configuration
//! - To enable file access, you MUST either:
//! 1. Set explicit `allowed_paths` in the configuration, OR
//! 2. Configure a `workspace_root` directory
//!
//! Example configuration:
//! ```ignore
//! let validator = PathValidator::with_config(config)
//! .with_workspace(PathBuf::from("/safe/workspace"));
//! ```
use std::path::{Path, PathBuf, Component};
use zclaw_types::{Result, ZclawError};
/// Path validator configuration
#[derive(Debug, Clone)]
pub struct PathValidatorConfig {
/// Allowed directory prefixes (empty = allow all within workspace)
pub allowed_paths: Vec<PathBuf>,
/// Blocked paths (always denied, even if in allowed_paths)
pub blocked_paths: Vec<PathBuf>,
/// Maximum file size in bytes (0 = no limit)
pub max_file_size: u64,
/// Whether to allow symbolic links
pub allow_symlinks: bool,
}
impl Default for PathValidatorConfig {
fn default() -> Self {
Self {
allowed_paths: Vec::new(),
blocked_paths: default_blocked_paths(),
max_file_size: 10 * 1024 * 1024, // 10MB default
allow_symlinks: false,
}
}
}
impl PathValidatorConfig {
/// Create config from security.toml settings
pub fn from_config(allowed: &[String], blocked: &[String], max_size: &str) -> Self {
let allowed_paths: Vec<PathBuf> = allowed
.iter()
.map(|p| expand_tilde(p))
.collect();
let blocked_paths: Vec<PathBuf> = blocked
.iter()
.map(|p| PathBuf::from(p))
.chain(default_blocked_paths())
.collect();
let max_file_size = parse_size(max_size).unwrap_or(10 * 1024 * 1024);
Self {
allowed_paths,
blocked_paths,
max_file_size,
allow_symlinks: false,
}
}
}
/// Default blocked paths for security
fn default_blocked_paths() -> Vec<PathBuf> {
vec![
// Unix sensitive files
PathBuf::from("/etc/shadow"),
PathBuf::from("/etc/passwd"),
PathBuf::from("/etc/sudoers"),
PathBuf::from("/root"),
PathBuf::from("/proc"),
PathBuf::from("/sys"),
// Windows sensitive paths
PathBuf::from("C:\\Windows\\System32\\config"),
PathBuf::from("C:\\Users\\Administrator"),
// SSH keys
PathBuf::from("/.ssh"),
PathBuf::from("/root/.ssh"),
// Environment files
PathBuf::from(".env"),
PathBuf::from(".env.local"),
PathBuf::from(".env.production"),
]
}
/// Expand tilde in path to home directory
fn expand_tilde(path: &str) -> PathBuf {
if path.starts_with('~') {
if let Some(home) = dirs::home_dir() {
if path == "~" {
return home;
}
if path.starts_with("~/") || path.starts_with("~\\") {
return home.join(&path[2..]);
}
}
}
PathBuf::from(path)
}
/// Parse size string like "10MB", "1GB", etc.
fn parse_size(s: &str) -> Option<u64> {
let s = s.trim().to_uppercase();
let (num, unit) = if s.ends_with("GB") {
(s.trim_end_matches("GB").trim(), 1024 * 1024 * 1024)
} else if s.ends_with("MB") {
(s.trim_end_matches("MB").trim(), 1024 * 1024)
} else if s.ends_with("KB") {
(s.trim_end_matches("KB").trim(), 1024)
} else if s.ends_with("B") {
(s.trim_end_matches("B").trim(), 1)
} else {
(s.as_str(), 1)
};
num.parse::<u64>().ok().map(|n| n * unit)
}
/// Path validator for file system security
#[derive(Debug, Clone)]
pub struct PathValidator {
config: PathValidatorConfig,
workspace_root: Option<PathBuf>,
}
impl PathValidator {
/// Create a new path validator with default config
pub fn new() -> Self {
Self {
config: PathValidatorConfig::default(),
workspace_root: None,
}
}
/// Create a path validator with custom config
pub fn with_config(config: PathValidatorConfig) -> Self {
Self {
config,
workspace_root: None,
}
}
/// Set the workspace root directory
pub fn with_workspace(mut self, workspace: PathBuf) -> Self {
self.workspace_root = Some(workspace);
self
}
/// Get the workspace root directory
pub fn workspace_root(&self) -> Option<&PathBuf> {
self.workspace_root.as_ref()
}
/// Validate a path for read access
pub fn validate_read(&self, path: &str) -> Result<PathBuf> {
let canonical = self.resolve_and_validate(path)?;
// Check if file exists
if !canonical.exists() {
return Err(ZclawError::InvalidInput(format!(
"File does not exist: {}",
path
)));
}
// Check if it's a file (not directory)
if !canonical.is_file() {
return Err(ZclawError::InvalidInput(format!(
"Path is not a file: {}",
path
)));
}
// Check file size
if self.config.max_file_size > 0 {
if let Ok(metadata) = std::fs::metadata(&canonical) {
if metadata.len() > self.config.max_file_size {
return Err(ZclawError::InvalidInput(format!(
"File too large: {} bytes (max: {} bytes)",
metadata.len(),
self.config.max_file_size
)));
}
}
}
Ok(canonical)
}
/// Validate a path for write access
pub fn validate_write(&self, path: &str) -> Result<PathBuf> {
let canonical = self.resolve_and_validate(path)?;
// Check parent directory exists
if let Some(parent) = canonical.parent() {
if !parent.exists() {
return Err(ZclawError::InvalidInput(format!(
"Parent directory does not exist: {}",
parent.display()
)));
}
}
// If file exists, check it's not blocked
if canonical.exists() && !canonical.is_file() {
return Err(ZclawError::InvalidInput(format!(
"Path exists but is not a file: {}",
path
)));
}
Ok(canonical)
}
/// Resolve and validate a path
fn resolve_and_validate(&self, path: &str) -> Result<PathBuf> {
// Expand tilde
let expanded = expand_tilde(path);
let path_buf = PathBuf::from(&expanded);
// Check for path traversal
self.check_path_traversal(&path_buf)?;
// Resolve to canonical path
let canonical = if path_buf.exists() {
path_buf
.canonicalize()
.map_err(|e| ZclawError::InvalidInput(format!("Cannot resolve path: {}", e)))?
} else {
// For non-existent files, resolve parent and join
let parent = path_buf.parent().unwrap_or(Path::new("."));
let canonical_parent = parent
.canonicalize()
.map_err(|e| ZclawError::InvalidInput(format!("Cannot resolve parent path: {}", e)))?;
canonical_parent.join(path_buf.file_name().unwrap_or_default())
};
// Check blocked paths
self.check_blocked(&canonical)?;
// Check allowed paths
self.check_allowed(&canonical)?;
// Check symlinks
if !self.config.allow_symlinks {
self.check_symlink(&canonical)?;
}
Ok(canonical)
}
/// Check for path traversal attacks
fn check_path_traversal(&self, path: &Path) -> Result<()> {
for component in path.components() {
if let Component::ParentDir = component {
// Allow .. if workspace is configured (will be validated in check_allowed)
// Deny .. if no workspace is configured (more restrictive)
if self.workspace_root.is_none() {
// Without workspace, be more restrictive
return Err(ZclawError::InvalidInput(
"Path traversal not allowed outside workspace".to_string()
));
}
}
}
Ok(())
}
/// Check if path is in blocked list
fn check_blocked(&self, path: &Path) -> Result<()> {
for blocked in &self.config.blocked_paths {
if path.starts_with(blocked) || path == blocked {
return Err(ZclawError::InvalidInput(format!(
"Access to this path is blocked: {}",
path.display()
)));
}
}
Ok(())
}
/// Check if path is in allowed list
///
/// # Security: Default Deny Policy
///
/// This method implements a strict default-deny security policy:
/// - If `allowed_paths` is empty AND no `workspace_root` is configured,
/// access is **denied by default** with a clear error message
/// - This prevents accidental exposure of the entire filesystem
/// when the validator is misconfigured or used without setup
fn check_allowed(&self, path: &Path) -> Result<()> {
// If no allowed paths specified, check workspace
if self.config.allowed_paths.is_empty() {
if let Some(ref workspace) = self.workspace_root {
// Workspace is configured - validate path is within it
if !path.starts_with(workspace) {
return Err(ZclawError::InvalidInput(format!(
"Path outside workspace: {} (workspace: {})",
path.display(),
workspace.display()
)));
}
return Ok(());
} else {
// SECURITY: No allowed_paths AND no workspace_root configured
// Default to DENY - do not allow unrestricted filesystem access
return Err(ZclawError::InvalidInput(
"Path access denied: no workspace or allowed paths configured. \
To enable file access, configure either 'allowed_paths' in security.toml \
or set a workspace_root directory."
.to_string(),
));
}
}
// Check against allowed paths
for allowed in &self.config.allowed_paths {
if path.starts_with(allowed) {
return Ok(());
}
}
Err(ZclawError::InvalidInput(format!(
"Path not in allowed directories: {}",
path.display()
)))
}
/// Check for symbolic links
fn check_symlink(&self, path: &Path) -> Result<()> {
if path.exists() {
let metadata = std::fs::symlink_metadata(path)
.map_err(|e| ZclawError::InvalidInput(format!("Cannot read path metadata: {}", e)))?;
if metadata.file_type().is_symlink() {
return Err(ZclawError::InvalidInput(
"Symbolic links are not allowed".to_string()
));
}
}
Ok(())
}
}
impl Default for PathValidator {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_size() {
assert_eq!(parse_size("10MB"), Some(10 * 1024 * 1024));
assert_eq!(parse_size("1GB"), Some(1024 * 1024 * 1024));
assert_eq!(parse_size("512KB"), Some(512 * 1024));
assert_eq!(parse_size("1024B"), Some(1024));
}
#[test]
fn test_expand_tilde() {
let home = dirs::home_dir().unwrap_or_default();
assert_eq!(expand_tilde("~"), home);
assert!(expand_tilde("~/test").starts_with(&home));
assert_eq!(expand_tilde("/absolute/path"), PathBuf::from("/absolute/path"));
}
#[test]
fn test_blocked_paths() {
let validator = PathValidator::new();
// These should be blocked (blocked paths take precedence)
assert!(validator.resolve_and_validate("/etc/shadow").is_err());
assert!(validator.resolve_and_validate("/etc/passwd").is_err());
}
#[test]
fn test_path_traversal() {
// Without workspace, traversal should fail
let no_workspace = PathValidator::new();
assert!(no_workspace.resolve_and_validate("../../../etc/passwd").is_err());
}
#[test]
fn test_default_deny_without_configuration() {
// SECURITY TEST: Verify default deny policy when no configuration is set
// A validator with no allowed_paths and no workspace_root should deny all access
let validator = PathValidator::new();
// Even valid paths should be denied when not configured
let result = validator.check_allowed(Path::new("/some/random/path"));
assert!(result.is_err(), "Expected denial when no configuration is set");
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("no workspace or allowed paths configured"),
"Error message should explain configuration requirement, got: {}",
err_msg
);
}
#[test]
fn test_allows_with_workspace_root() {
// When workspace_root is set, paths within workspace should be allowed
let workspace = std::env::temp_dir();
let validator = PathValidator::new()
.with_workspace(workspace.clone());
// Path within workspace should pass the allowed check
let test_path = workspace.join("test_file.txt");
let result = validator.check_allowed(&test_path);
assert!(result.is_ok(), "Path within workspace should be allowed");
}
#[test]
fn test_allows_with_explicit_allowed_paths() {
// When allowed_paths is configured, those paths should be allowed
let temp_dir = std::env::temp_dir();
let config = PathValidatorConfig {
allowed_paths: vec![temp_dir.clone()],
blocked_paths: vec![],
max_file_size: 0,
allow_symlinks: false,
};
let validator = PathValidator::with_config(config);
// Path within allowed_paths should pass
let test_path = temp_dir.join("test_file.txt");
let result = validator.check_allowed(&test_path);
assert!(result.is_ok(), "Path in allowed_paths should be allowed");
}
#[test]
fn test_denies_outside_workspace() {
// Paths outside workspace_root should be denied
let validator = PathValidator::new()
.with_workspace(PathBuf::from("/safe/workspace"));
let result = validator.check_allowed(Path::new("/other/location"));
assert!(result.is_err(), "Path outside workspace should be denied");
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Path outside workspace"),
"Error should indicate path is outside workspace, got: {}",
err_msg
);
}
}