use sea_orm::{ConnectionTrait, DatabaseConnection, FromQueryResult, Statement, Value}; use uuid::Uuid; use crate::error::{PluginError, PluginResult}; use crate::manifest::PluginEntity; /// 消毒标识符:只保留 ASCII 字母、数字、下划线,防止 SQL 注入 fn sanitize_identifier(input: &str) -> String { input .chars() .map(|c| if c.is_ascii_alphanumeric() || c == '_' { c } else { '_' }) .collect() } /// 动态表管理器 — 处理插件动态创建/删除的数据库表 pub struct DynamicTableManager; impl DynamicTableManager { /// 生成动态表名: `plugin_{sanitized_id}_{sanitized_entity}` pub fn table_name(plugin_id: &str, entity_name: &str) -> String { let sanitized_id = sanitize_identifier(plugin_id); let sanitized_entity = sanitize_identifier(entity_name); format!("plugin_{}_{}", sanitized_id, sanitized_entity) } /// 创建动态表 pub async fn create_table( db: &DatabaseConnection, plugin_id: &str, entity: &PluginEntity, ) -> PluginResult<()> { let table_name = Self::table_name(plugin_id, &entity.name); // 创建表 let create_sql = format!( "CREATE TABLE IF NOT EXISTS \"{table_name}\" (\ \"id\" UUID PRIMARY KEY, \ \"tenant_id\" UUID NOT NULL, \ \"data\" JSONB NOT NULL DEFAULT '{{}}', \ \"created_at\" TIMESTAMPTZ NOT NULL DEFAULT NOW(), \ \"updated_at\" TIMESTAMPTZ NOT NULL DEFAULT NOW(), \ \"created_by\" UUID, \ \"updated_by\" UUID, \ \"deleted_at\" TIMESTAMPTZ, \ \"version\" INT NOT NULL DEFAULT 1)" ); db.execute(Statement::from_string( sea_orm::DatabaseBackend::Postgres, create_sql, )) .await .map_err(|e| PluginError::DatabaseError(e.to_string()))?; // 创建租户索引 let tenant_idx_sql = format!( "CREATE INDEX IF NOT EXISTS \"idx_{t}_tenant\" ON \"{table_name}\" (\"tenant_id\") WHERE \"deleted_at\" IS NULL", t = sanitize_identifier(&table_name) ); db.execute(Statement::from_string( sea_orm::DatabaseBackend::Postgres, tenant_idx_sql, )) .await .map_err(|e| PluginError::DatabaseError(e.to_string()))?; // 为字段创建索引(使用参数化方式避免 SQL 注入) for field in &entity.fields { if field.unique || field.required { let sanitized_field = sanitize_identifier(&field.name); let idx_name = format!( "idx_{}_{}_{}", sanitize_identifier(&table_name), sanitized_field, if field.unique { "uniq" } else { "idx" } ); // unique 字段使用 CREATE UNIQUE INDEX,由数据库保证数据完整性 let idx_sql = if field.unique { format!( "CREATE UNIQUE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{table_name}\" (\"data\"->>'{sanitized_field}') WHERE \"deleted_at\" IS NULL" ) } else { format!( "CREATE INDEX IF NOT EXISTS \"{idx_name}\" ON \"{table_name}\" (\"data\"->>'{sanitized_field}') WHERE \"deleted_at\" IS NULL" ) }; db.execute(Statement::from_string( sea_orm::DatabaseBackend::Postgres, idx_sql, )) .await .map_err(|e| PluginError::DatabaseError(e.to_string()))?; } } // 为 searchable 字段创建 B-tree 索引以加速 ILIKE 前缀查询 for field in &entity.fields { if field.searchable == Some(true) { let sanitized_field = sanitize_identifier(&field.name); let idx_name = format!("{}_{}_sidx", sanitize_identifier(&table_name), sanitized_field); let idx_sql = format!( "CREATE INDEX IF NOT EXISTS \"{}\" ON \"{}\" (\"data\"->>'{}') WHERE \"deleted_at\" IS NULL", idx_name, table_name, sanitized_field ); db.execute(Statement::from_string( sea_orm::DatabaseBackend::Postgres, idx_sql, )) .await .map_err(|e| PluginError::DatabaseError(e.to_string()))?; } } tracing::info!(table = %table_name, "Dynamic table created"); Ok(()) } /// 删除动态表 pub async fn drop_table( db: &DatabaseConnection, plugin_id: &str, entity_name: &str, ) -> PluginResult<()> { let table_name = Self::table_name(plugin_id, entity_name); let sql = format!("DROP TABLE IF EXISTS \"{}\"", table_name); db.execute(Statement::from_string( sea_orm::DatabaseBackend::Postgres, sql, )) .await .map_err(|e| PluginError::DatabaseError(e.to_string()))?; tracing::info!(table = %table_name, "Dynamic table dropped"); Ok(()) } /// 检查表是否存在 pub async fn table_exists(db: &DatabaseConnection, table_name: &str) -> PluginResult { #[derive(FromQueryResult)] struct ExistsResult { exists: bool, } let result = ExistsResult::find_by_statement(Statement::from_sql_and_values( sea_orm::DatabaseBackend::Postgres, "SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = $1)", [table_name.into()], )) .one(db) .await .map_err(|e| PluginError::DatabaseError(e.to_string()))?; Ok(result.map(|r| r.exists).unwrap_or(false)) } /// 构建 INSERT SQL pub fn build_insert_sql( table_name: &str, tenant_id: Uuid, user_id: Uuid, data: &serde_json::Value, ) -> (String, Vec) { let id = Uuid::now_v7(); Self::build_insert_sql_with_id(table_name, id, tenant_id, user_id, data) } /// 构建 INSERT SQL(指定 ID) pub fn build_insert_sql_with_id( table_name: &str, id: Uuid, tenant_id: Uuid, user_id: Uuid, data: &serde_json::Value, ) -> (String, Vec) { let sql = format!( "INSERT INTO \"{}\" (id, tenant_id, data, created_by, updated_by, version) \ VALUES ($1, $2, $3, $4, $5, 1) \ RETURNING id, tenant_id, data, created_at, updated_at, version", table_name ); let values = vec![ id.into(), tenant_id.into(), serde_json::to_string(data).unwrap_or_default().into(), user_id.into(), user_id.into(), ]; (sql, values) } /// 构建 SELECT SQL pub fn build_query_sql( table_name: &str, tenant_id: Uuid, limit: u64, offset: u64, ) -> (String, Vec) { let sql = format!( "SELECT id, data, created_at, updated_at, version \ FROM \"{}\" \ WHERE tenant_id = $1 AND deleted_at IS NULL \ ORDER BY created_at DESC \ LIMIT $2 OFFSET $3", table_name ); let values = vec![tenant_id.into(), (limit as i64).into(), (offset as i64).into()]; (sql, values) } /// 构建 COUNT SQL pub fn build_count_sql(table_name: &str, tenant_id: Uuid) -> (String, Vec) { let sql = format!( "SELECT COUNT(*) as count FROM \"{}\" WHERE tenant_id = $1 AND deleted_at IS NULL", table_name ); let values = vec![tenant_id.into()]; (sql, values) } /// 构建 UPDATE SQL(含乐观锁) pub fn build_update_sql( table_name: &str, id: Uuid, tenant_id: Uuid, user_id: Uuid, data: &serde_json::Value, version: i32, ) -> (String, Vec) { let sql = format!( "UPDATE \"{}\" \ SET data = $1, updated_at = NOW(), updated_by = $2, version = version + 1 \ WHERE id = $3 AND tenant_id = $4 AND version = $5 AND deleted_at IS NULL \ RETURNING id, data, created_at, updated_at, version", table_name ); let values = vec![ serde_json::to_string(data).unwrap_or_default().into(), user_id.into(), id.into(), tenant_id.into(), version.into(), ]; (sql, values) } /// 构建 DELETE SQL(软删除) pub fn build_delete_sql( table_name: &str, id: Uuid, tenant_id: Uuid, ) -> (String, Vec) { let sql = format!( "UPDATE \"{}\" \ SET deleted_at = NOW(), updated_at = NOW() \ WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL", table_name ); let values = vec![id.into(), tenant_id.into()]; (sql, values) } /// 构建单条查询 SQL pub fn build_get_by_id_sql( table_name: &str, id: Uuid, tenant_id: Uuid, ) -> (String, Vec) { let sql = format!( "SELECT id, data, created_at, updated_at, version \ FROM \"{}\" \ WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL", table_name ); let values = vec![id.into(), tenant_id.into()]; (sql, values) } /// 构建唯一索引 SQL(供测试验证) pub fn build_unique_index_sql(table_name: &str, field_name: &str) -> String { let sanitized_field = sanitize_identifier(field_name); let idx_name = format!( "idx_{}_{}_uniq", sanitize_identifier(table_name), sanitized_field ); format!( "CREATE UNIQUE INDEX IF NOT EXISTS \"{}\" ON \"{}\" (\"data\"->>'{}') WHERE \"deleted_at\" IS NULL", idx_name, table_name, sanitized_field ) } /// 构建带过滤条件的查询 SQL pub fn build_filtered_query_sql( table_name: &str, tenant_id: Uuid, limit: u64, offset: u64, filter: Option, search: Option<(String, String)>, // (searchable_fields_csv, keyword) sort_by: Option, sort_order: Option, ) -> Result<(String, Vec), String> { let mut conditions = vec![ format!("\"tenant_id\" = ${}", 1), "\"deleted_at\" IS NULL".to_string(), ]; let mut param_idx = 2; let mut values: Vec = vec![tenant_id.into()]; // 处理 filter if let Some(f) = filter { if let Some(obj) = f.as_object() { for (key, val) in obj { let clean_key = sanitize_identifier(key); if clean_key.is_empty() { return Err(format!("无效的过滤字段名: {}", key)); } conditions.push(format!("\"data\"->>'{}' = ${}", clean_key, param_idx)); values.push(Value::String(Some(Box::new( val.as_str().unwrap_or("").to_string(), )))); param_idx += 1; } } } // 处理 search — 所有 searchable 字段共享同一个 ILIKE 参数 if let Some((fields_csv, keyword)) = search { let escaped = keyword.replace('%', "\\%").replace('_', "\\_"); let fields: Vec<&str> = fields_csv.split(',').collect(); let search_param_idx = param_idx; let search_conditions: Vec = fields .iter() .map(|f| { let clean = sanitize_identifier(f.trim()); format!("\"data\"->>'{}' ILIKE ${}", clean, search_param_idx) }) .collect(); conditions.push(format!("({})", search_conditions.join(" OR "))); values.push(Value::String(Some(Box::new(format!("%{}%", escaped))))); param_idx += 1; } // 处理 sort let order_clause = if let Some(sb) = sort_by { let clean = sanitize_identifier(&sb); if clean.is_empty() { return Err(format!("无效的排序字段名: {}", sb)); } let dir = match sort_order.as_deref() { Some("asc") | Some("ASC") => "ASC", _ => "DESC", }; format!("ORDER BY \"data\"->>'{}' {}", clean, dir) } else { "ORDER BY \"created_at\" DESC".to_string() }; let sql = format!( "SELECT id, data, created_at, updated_at, version FROM \"{}\" WHERE {} {} LIMIT ${} OFFSET ${}", table_name, conditions.join(" AND "), order_clause, param_idx, param_idx + 1, ); values.push((limit as i64).into()); values.push((offset as i64).into()); Ok((sql, values)) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_unique_index_sql_uses_create_unique_index() { let sql = DynamicTableManager::build_unique_index_sql("plugin_test", "code"); assert!( sql.contains("CREATE UNIQUE INDEX"), "Expected UNIQUE index, got: {}", sql ); } #[test] fn test_build_filtered_query_sql_with_filter() { let (sql, values) = DynamicTableManager::build_filtered_query_sql( "plugin_test_customer", Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(), 20, 0, Some(serde_json::json!({"customer_id": "abc-123"})), None, None, None, ) .unwrap(); assert!( sql.contains("\"data\"->>'customer_id' ="), "Expected filter in SQL, got: {}", sql ); assert!(sql.contains("tenant_id"), "Expected tenant_id filter"); // 验证参数值 assert_eq!(values.len(), 4); // tenant_id + filter_value + limit + offset } #[test] fn test_build_filtered_query_sql_sanitizes_keys() { let result = DynamicTableManager::build_filtered_query_sql( "plugin_test", Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(), 20, 0, Some(serde_json::json!({"evil'; DROP TABLE--": "value"})), None, None, None, ); // 恶意 key 被清理为合法标识符 let (sql, _) = result.unwrap(); assert!( !sql.contains("DROP TABLE"), "SQL should not contain injection: {}", sql ); assert!( sql.contains("evil___DROP_TABLE__"), "Key should be sanitized: {}", sql ); } #[test] fn test_build_filtered_query_sql_with_search() { let (sql, values) = DynamicTableManager::build_filtered_query_sql( "plugin_test", Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(), 20, 0, None, Some(("name,code".to_string(), "测试关键词".to_string())), None, None, ) .unwrap(); assert!(sql.contains("ILIKE"), "Expected ILIKE in SQL, got: {}", sql); // 验证搜索参数值包含 %...% if let Value::String(Some(s)) = &values[1] { assert!(s.contains("测试关键词"), "Search value should contain keyword"); assert!(s.starts_with('%'), "Search value should start with %"); } } #[test] fn test_build_filtered_query_sql_with_sort() { let (sql, _) = DynamicTableManager::build_filtered_query_sql( "plugin_test", Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(), 20, 0, None, None, Some("name".to_string()), Some("asc".to_string()), ) .unwrap(); assert!( sql.contains("ORDER BY \"data\"->>'name' ASC"), "Expected sort clause, got: {}", sql ); } #[test] fn test_build_filtered_query_sql_default_sort() { let (sql, _) = DynamicTableManager::build_filtered_query_sql( "plugin_test", Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(), 20, 0, None, None, None, None, ) .unwrap(); assert!( sql.contains("ORDER BY \"created_at\" DESC"), "Expected default sort, got: {}", sql ); } }