1126 lines
39 KiB
Rust
1126 lines
39 KiB
Rust
use sea_orm::{ConnectionTrait, DatabaseConnection, FromQueryResult, Statement, Value};
|
||
use uuid::Uuid;
|
||
|
||
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
|
||
|
||
use crate::error::{PluginError, PluginResult};
|
||
use crate::manifest::{PluginEntity, PluginFieldType};
|
||
|
||
/// 消毒标识符:只保留 ASCII 字母、数字、下划线,防止 SQL 注入
|
||
pub(crate) fn sanitize_identifier(input: &str) -> String {
|
||
input
|
||
.chars()
|
||
.map(|c| if c.is_ascii_alphanumeric() || c == '_' { c } else { '_' })
|
||
.collect()
|
||
}
|
||
|
||
/// 动态表管理器 — 处理插件动态创建/删除的数据库表
|
||
pub struct DynamicTableManager;
|
||
|
||
impl DynamicTableManager {
|
||
/// 生成动态表名: `plugin_{sanitized_id}_{sanitized_entity}`
|
||
pub fn table_name(plugin_id: &str, entity_name: &str) -> String {
|
||
let sanitized_id = sanitize_identifier(plugin_id);
|
||
let sanitized_entity = sanitize_identifier(entity_name);
|
||
format!("plugin_{}_{}", sanitized_id, sanitized_entity)
|
||
}
|
||
|
||
/// 生成包含 Generated Column 的建表 DDL
|
||
pub fn build_create_table_sql(plugin_id: &str, entity: &PluginEntity) -> String {
|
||
let table_name = Self::table_name(plugin_id, &entity.name);
|
||
|
||
let mut gen_cols = Vec::new();
|
||
let mut indexes = Vec::new();
|
||
|
||
for field in &entity.fields {
|
||
if !field.field_type.supports_generated_column() {
|
||
continue;
|
||
}
|
||
// 提取规则:unique / sortable / filterable
|
||
let should_extract = field.unique
|
||
|| field.sortable == Some(true)
|
||
|| field.filterable == Some(true)
|
||
|| (field.required
|
||
&& (field.sortable == Some(true) || field.filterable == Some(true)));
|
||
|
||
if !should_extract {
|
||
continue;
|
||
}
|
||
|
||
let col_name = format!("_f_{}", sanitize_identifier(&field.name));
|
||
let sql_type = field.field_type.generated_sql_type();
|
||
let expr = field.field_type.generated_expr(&sanitize_identifier(&field.name));
|
||
|
||
gen_cols.push(format!(
|
||
" \"{}\" {} GENERATED ALWAYS AS ({}) STORED",
|
||
col_name, sql_type, expr
|
||
));
|
||
|
||
// 索引策略
|
||
let col_idx = format!("{}_{}", sanitize_identifier(&table_name), col_name);
|
||
if field.unique {
|
||
indexes.push(format!(
|
||
"CREATE UNIQUE INDEX IF NOT EXISTS \"idx_{}_uniq\" ON \"{}\" (tenant_id, \"{}\") WHERE deleted_at IS NULL",
|
||
col_idx, table_name, col_name
|
||
));
|
||
} else {
|
||
indexes.push(format!(
|
||
"CREATE INDEX IF NOT EXISTS \"idx_{}\" ON \"{}\" (tenant_id, \"{}\") WHERE deleted_at IS NULL",
|
||
col_idx, table_name, col_name
|
||
));
|
||
}
|
||
}
|
||
|
||
// pg_trgm 索引
|
||
for field in &entity.fields {
|
||
if field.searchable == Some(true)
|
||
&& matches!(field.field_type, PluginFieldType::String)
|
||
{
|
||
let sf = sanitize_identifier(&field.name);
|
||
indexes.push(format!(
|
||
"CREATE INDEX IF NOT EXISTS \"idx_{}_{}_trgm\" ON \"{}\" USING GIN ((data->>'{}') gin_trgm_ops) WHERE deleted_at IS NULL",
|
||
sanitize_identifier(&table_name), sf, table_name, sf
|
||
));
|
||
}
|
||
}
|
||
|
||
// 覆盖索引
|
||
indexes.push(format!(
|
||
"CREATE INDEX IF NOT EXISTS \"idx_{}_tenant_cover\" ON \"{}\" (tenant_id, created_at DESC) INCLUDE (id, data, updated_at, version) WHERE deleted_at IS NULL",
|
||
sanitize_identifier(&table_name), table_name
|
||
));
|
||
|
||
let gen_cols_sql = if gen_cols.is_empty() {
|
||
String::new()
|
||
} else {
|
||
format!(",\n{}", gen_cols.join(",\n"))
|
||
};
|
||
|
||
format!(
|
||
"CREATE TABLE IF NOT EXISTS \"{}\" (\
|
||
\"id\" UUID PRIMARY KEY DEFAULT gen_random_uuid(), \
|
||
\"tenant_id\" UUID NOT NULL, \
|
||
\"data\" JSONB NOT NULL DEFAULT '{{}}'{gen_cols}, \
|
||
\"created_at\" TIMESTAMPTZ NOT NULL DEFAULT NOW(), \
|
||
\"updated_at\" TIMESTAMPTZ NOT NULL DEFAULT NOW(), \
|
||
\"created_by\" UUID, \
|
||
\"updated_by\" UUID, \
|
||
\"deleted_at\" TIMESTAMPTZ, \
|
||
\"version\" INT NOT NULL DEFAULT 1);\n\
|
||
{}",
|
||
table_name,
|
||
indexes.join(";\n"),
|
||
gen_cols = gen_cols_sql,
|
||
)
|
||
}
|
||
|
||
/// 创建动态表(使用 Generated Column + pg_trgm + 覆盖索引)
|
||
pub async fn create_table(
|
||
db: &DatabaseConnection,
|
||
plugin_id: &str,
|
||
entity: &PluginEntity,
|
||
) -> PluginResult<()> {
|
||
let ddl = Self::build_create_table_sql(plugin_id, entity);
|
||
for sql in ddl
|
||
.split(';')
|
||
.map(|s| s.trim())
|
||
.filter(|s| !s.is_empty())
|
||
{
|
||
tracing::info!(sql = %sql, "Executing DDL");
|
||
db.execute_unprepared(sql).await.map_err(|e| {
|
||
tracing::error!(sql = %sql, error = %e, "DDL execution failed");
|
||
PluginError::DatabaseError(e.to_string())
|
||
})?;
|
||
}
|
||
tracing::info!(
|
||
plugin_id = %plugin_id,
|
||
entity = %entity.name,
|
||
"Dynamic table created with Generated Columns"
|
||
);
|
||
Ok(())
|
||
}
|
||
|
||
/// 删除动态表
|
||
pub async fn drop_table(
|
||
db: &DatabaseConnection,
|
||
plugin_id: &str,
|
||
entity_name: &str,
|
||
) -> PluginResult<()> {
|
||
let table_name = Self::table_name(plugin_id, entity_name);
|
||
let sql = format!("DROP TABLE IF EXISTS \"{}\"", table_name);
|
||
db.execute(Statement::from_string(
|
||
sea_orm::DatabaseBackend::Postgres,
|
||
sql,
|
||
))
|
||
.await
|
||
.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
|
||
tracing::info!(table = %table_name, "Dynamic table dropped");
|
||
Ok(())
|
||
}
|
||
|
||
/// 检查表是否存在
|
||
pub async fn table_exists(db: &DatabaseConnection, table_name: &str) -> PluginResult<bool> {
|
||
#[derive(FromQueryResult)]
|
||
struct ExistsResult {
|
||
exists: bool,
|
||
}
|
||
let result = ExistsResult::find_by_statement(Statement::from_sql_and_values(
|
||
sea_orm::DatabaseBackend::Postgres,
|
||
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = $1)",
|
||
[table_name.into()],
|
||
))
|
||
.one(db)
|
||
.await
|
||
.map_err(|e| PluginError::DatabaseError(e.to_string()))?;
|
||
|
||
Ok(result.map(|r| r.exists).unwrap_or(false))
|
||
}
|
||
|
||
/// 构建 INSERT SQL
|
||
pub fn build_insert_sql(
|
||
table_name: &str,
|
||
tenant_id: Uuid,
|
||
user_id: Uuid,
|
||
data: &serde_json::Value,
|
||
) -> (String, Vec<Value>) {
|
||
let id = Uuid::now_v7();
|
||
Self::build_insert_sql_with_id(table_name, id, tenant_id, user_id, data)
|
||
}
|
||
|
||
/// 构建 INSERT SQL(指定 ID)
|
||
pub fn build_insert_sql_with_id(
|
||
table_name: &str,
|
||
id: Uuid,
|
||
tenant_id: Uuid,
|
||
user_id: Uuid,
|
||
data: &serde_json::Value,
|
||
) -> (String, Vec<Value>) {
|
||
let sql = format!(
|
||
"INSERT INTO \"{}\" (id, tenant_id, data, created_by, updated_by, version) \
|
||
VALUES ($1, $2, $3::jsonb, $4, $5, 1) \
|
||
RETURNING id, tenant_id, data, created_at, updated_at, version",
|
||
table_name
|
||
);
|
||
let values = vec![
|
||
id.into(),
|
||
tenant_id.into(),
|
||
serde_json::to_string(data).unwrap_or_default().into(),
|
||
user_id.into(),
|
||
user_id.into(),
|
||
];
|
||
(sql, values)
|
||
}
|
||
|
||
/// 构建 SELECT SQL
|
||
pub fn build_query_sql(
|
||
table_name: &str,
|
||
tenant_id: Uuid,
|
||
limit: u64,
|
||
offset: u64,
|
||
) -> (String, Vec<Value>) {
|
||
let sql = format!(
|
||
"SELECT id, data, created_at, updated_at, version \
|
||
FROM \"{}\" \
|
||
WHERE tenant_id = $1 AND deleted_at IS NULL \
|
||
ORDER BY created_at DESC \
|
||
LIMIT $2 OFFSET $3",
|
||
table_name
|
||
);
|
||
let values = vec![tenant_id.into(), (limit as i64).into(), (offset as i64).into()];
|
||
(sql, values)
|
||
}
|
||
|
||
/// 构建 COUNT SQL
|
||
pub fn build_count_sql(table_name: &str, tenant_id: Uuid) -> (String, Vec<Value>) {
|
||
let sql = format!(
|
||
"SELECT COUNT(*) as count FROM \"{}\" WHERE tenant_id = $1 AND deleted_at IS NULL",
|
||
table_name
|
||
);
|
||
let values = vec![tenant_id.into()];
|
||
(sql, values)
|
||
}
|
||
|
||
/// 构建 UPDATE SQL(含乐观锁)
|
||
pub fn build_update_sql(
|
||
table_name: &str,
|
||
id: Uuid,
|
||
tenant_id: Uuid,
|
||
user_id: Uuid,
|
||
data: &serde_json::Value,
|
||
version: i32,
|
||
) -> (String, Vec<Value>) {
|
||
let sql = format!(
|
||
"UPDATE \"{}\" \
|
||
SET data = $1::jsonb, updated_at = NOW(), updated_by = $2, version = version + 1 \
|
||
WHERE id = $3 AND tenant_id = $4 AND version = $5 AND deleted_at IS NULL \
|
||
RETURNING id, data, created_at, updated_at, version",
|
||
table_name
|
||
);
|
||
let values = vec![
|
||
serde_json::to_string(data).unwrap_or_default().into(),
|
||
user_id.into(),
|
||
id.into(),
|
||
tenant_id.into(),
|
||
version.into(),
|
||
];
|
||
(sql, values)
|
||
}
|
||
|
||
/// 构建 DELETE SQL(软删除)
|
||
pub fn build_delete_sql(
|
||
table_name: &str,
|
||
id: Uuid,
|
||
tenant_id: Uuid,
|
||
) -> (String, Vec<Value>) {
|
||
let sql = format!(
|
||
"UPDATE \"{}\" \
|
||
SET deleted_at = NOW(), updated_at = NOW() \
|
||
WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL",
|
||
table_name
|
||
);
|
||
let values = vec![id.into(), tenant_id.into()];
|
||
(sql, values)
|
||
}
|
||
|
||
/// 构建单条查询 SQL
|
||
pub fn build_get_by_id_sql(
|
||
table_name: &str,
|
||
id: Uuid,
|
||
tenant_id: Uuid,
|
||
) -> (String, Vec<Value>) {
|
||
let sql = format!(
|
||
"SELECT id, data, created_at, updated_at, version \
|
||
FROM \"{}\" \
|
||
WHERE id = $1 AND tenant_id = $2 AND deleted_at IS NULL",
|
||
table_name
|
||
);
|
||
let values = vec![id.into(), tenant_id.into()];
|
||
(sql, values)
|
||
}
|
||
|
||
/// 构建唯一索引 SQL(供测试验证)
|
||
pub fn build_unique_index_sql(table_name: &str, field_name: &str) -> String {
|
||
let sanitized_field = sanitize_identifier(field_name);
|
||
let idx_name = format!(
|
||
"idx_{}_{}_uniq",
|
||
sanitize_identifier(table_name),
|
||
sanitized_field
|
||
);
|
||
format!(
|
||
"CREATE UNIQUE INDEX IF NOT EXISTS \"{}\" ON \"{}\" (\"data\"->>'{}') WHERE \"deleted_at\" IS NULL",
|
||
idx_name, table_name, sanitized_field
|
||
)
|
||
}
|
||
|
||
/// 构建带过滤条件的 COUNT SQL
|
||
/// 复用 build_filtered_query_sql 的条件构建逻辑,但只做 COUNT
|
||
pub fn build_filtered_count_sql(
|
||
table_name: &str,
|
||
tenant_id: Uuid,
|
||
filter: Option<serde_json::Value>,
|
||
search: Option<(String, String)>, // (searchable_fields_csv, keyword)
|
||
) -> Result<(String, Vec<Value>), String> {
|
||
let mut conditions = vec![
|
||
format!("\"tenant_id\" = ${}", 1),
|
||
"\"deleted_at\" IS NULL".to_string(),
|
||
];
|
||
let mut param_idx = 2;
|
||
let mut values: Vec<Value> = vec![tenant_id.into()];
|
||
|
||
// 处理 filter(与 build_filtered_query_sql 保持一致)
|
||
if let Some(f) = filter {
|
||
if let Some(obj) = f.as_object() {
|
||
for (key, val) in obj {
|
||
let clean_key = sanitize_identifier(key);
|
||
if clean_key.is_empty() {
|
||
return Err(format!("无效的过滤字段名: {}", key));
|
||
}
|
||
conditions.push(format!("\"data\"->>'{}' = ${}", clean_key, param_idx));
|
||
values.push(Value::String(Some(Box::new(
|
||
val.as_str().unwrap_or("").to_string(),
|
||
))));
|
||
param_idx += 1;
|
||
}
|
||
}
|
||
}
|
||
|
||
// 处理 search(与 build_filtered_query_sql 保持一致)
|
||
if let Some((fields_csv, keyword)) = search {
|
||
let escaped = keyword.replace('%', "\\%").replace('_', "\\_");
|
||
let fields: Vec<&str> = fields_csv.split(',').collect();
|
||
let search_param_idx = param_idx;
|
||
let search_conditions: Vec<String> = fields
|
||
.iter()
|
||
.map(|f| {
|
||
let clean = sanitize_identifier(f.trim());
|
||
format!("\"data\"->>'{}' ILIKE ${}", clean, search_param_idx)
|
||
})
|
||
.collect();
|
||
conditions.push(format!("({})", search_conditions.join(" OR ")));
|
||
values.push(Value::String(Some(Box::new(format!("%{}%", escaped)))));
|
||
}
|
||
|
||
let sql = format!(
|
||
"SELECT COUNT(*) as count FROM \"{}\" WHERE {}",
|
||
table_name,
|
||
conditions.join(" AND "),
|
||
);
|
||
|
||
Ok((sql, values))
|
||
}
|
||
|
||
/// 构建聚合查询 SQL — 按 JSONB 字段分组计数
|
||
/// SELECT data->>'group_field' as key, COUNT(*) as count
|
||
/// FROM table WHERE tenant_id = $1 AND deleted_at IS NULL [AND filter...]
|
||
/// GROUP BY data->>'group_field' ORDER BY count DESC
|
||
pub fn build_aggregate_sql(
|
||
table_name: &str,
|
||
tenant_id: Uuid,
|
||
group_by_field: &str,
|
||
filter: Option<serde_json::Value>,
|
||
) -> Result<(String, Vec<Value>), String> {
|
||
let clean_group = sanitize_identifier(group_by_field);
|
||
if clean_group.is_empty() {
|
||
return Err(format!("无效的分组字段名: {}", group_by_field));
|
||
}
|
||
|
||
let mut conditions = vec![
|
||
format!("\"tenant_id\" = ${}", 1),
|
||
"\"deleted_at\" IS NULL".to_string(),
|
||
];
|
||
let mut param_idx = 2;
|
||
let mut values: Vec<Value> = vec![tenant_id.into()];
|
||
|
||
// 处理 filter
|
||
if let Some(f) = filter {
|
||
if let Some(obj) = f.as_object() {
|
||
for (key, val) in obj {
|
||
let clean_key = sanitize_identifier(key);
|
||
if clean_key.is_empty() {
|
||
return Err(format!("无效的过滤字段名: {}", key));
|
||
}
|
||
conditions.push(format!("\"data\"->>'{}' = ${}", clean_key, param_idx));
|
||
values.push(Value::String(Some(Box::new(
|
||
val.as_str().unwrap_or("").to_string(),
|
||
))));
|
||
param_idx += 1;
|
||
}
|
||
}
|
||
}
|
||
|
||
let sql = format!(
|
||
"SELECT \"data\"->>'{}' as key, COUNT(*) as count \
|
||
FROM \"{}\" \
|
||
WHERE {} \
|
||
GROUP BY \"data\"->>'{}' \
|
||
ORDER BY count DESC",
|
||
clean_group,
|
||
table_name,
|
||
conditions.join(" AND "),
|
||
clean_group,
|
||
);
|
||
|
||
Ok((sql, values))
|
||
}
|
||
|
||
/// 构建带过滤条件的查询 SQL
|
||
pub fn build_filtered_query_sql(
|
||
table_name: &str,
|
||
tenant_id: Uuid,
|
||
limit: u64,
|
||
offset: u64,
|
||
filter: Option<serde_json::Value>,
|
||
search: Option<(String, String)>, // (searchable_fields_csv, keyword)
|
||
sort_by: Option<String>,
|
||
sort_order: Option<String>,
|
||
) -> Result<(String, Vec<Value>), String> {
|
||
let mut conditions = vec![
|
||
format!("\"tenant_id\" = ${}", 1),
|
||
"\"deleted_at\" IS NULL".to_string(),
|
||
];
|
||
let mut param_idx = 2;
|
||
let mut values: Vec<Value> = vec![tenant_id.into()];
|
||
|
||
// 处理 filter
|
||
if let Some(f) = filter {
|
||
if let Some(obj) = f.as_object() {
|
||
for (key, val) in obj {
|
||
let clean_key = sanitize_identifier(key);
|
||
if clean_key.is_empty() {
|
||
return Err(format!("无效的过滤字段名: {}", key));
|
||
}
|
||
conditions.push(format!("\"data\"->>'{}' = ${}", clean_key, param_idx));
|
||
values.push(Value::String(Some(Box::new(
|
||
val.as_str().unwrap_or("").to_string(),
|
||
))));
|
||
param_idx += 1;
|
||
}
|
||
}
|
||
}
|
||
|
||
// 处理 search — 所有 searchable 字段共享同一个 ILIKE 参数
|
||
if let Some((fields_csv, keyword)) = search {
|
||
let escaped = keyword.replace('%', "\\%").replace('_', "\\_");
|
||
let fields: Vec<&str> = fields_csv.split(',').collect();
|
||
let search_param_idx = param_idx;
|
||
let search_conditions: Vec<String> = fields
|
||
.iter()
|
||
.map(|f| {
|
||
let clean = sanitize_identifier(f.trim());
|
||
format!("\"data\"->>'{}' ILIKE ${}", clean, search_param_idx)
|
||
})
|
||
.collect();
|
||
conditions.push(format!("({})", search_conditions.join(" OR ")));
|
||
values.push(Value::String(Some(Box::new(format!("%{}%", escaped)))));
|
||
param_idx += 1;
|
||
}
|
||
|
||
// 处理 sort
|
||
let order_clause = if let Some(sb) = sort_by {
|
||
let clean = sanitize_identifier(&sb);
|
||
if clean.is_empty() {
|
||
return Err(format!("无效的排序字段名: {}", sb));
|
||
}
|
||
let dir = match sort_order.as_deref() {
|
||
Some("asc") | Some("ASC") => "ASC",
|
||
_ => "DESC",
|
||
};
|
||
format!("ORDER BY \"data\"->>'{}' {}", clean, dir)
|
||
} else {
|
||
"ORDER BY \"created_at\" DESC".to_string()
|
||
};
|
||
|
||
let sql = format!(
|
||
"SELECT id, data, created_at, updated_at, version FROM \"{}\" WHERE {} {} LIMIT ${} OFFSET ${}",
|
||
table_name,
|
||
conditions.join(" AND "),
|
||
order_clause,
|
||
param_idx,
|
||
param_idx + 1,
|
||
);
|
||
values.push((limit as i64).into());
|
||
values.push((offset as i64).into());
|
||
|
||
Ok((sql, values))
|
||
}
|
||
|
||
/// 返回字段引用函数 — Generated Column 存在时用 _f_{name}
|
||
pub fn field_reference_fn(generated_fields: &[String]) -> impl Fn(&str) -> String + '_ {
|
||
move |field_name: &str| {
|
||
let clean = sanitize_identifier(field_name);
|
||
if generated_fields.contains(&clean) {
|
||
format!("\"_f_{}\"", clean)
|
||
} else {
|
||
format!("\"data\"->>'{}'", clean)
|
||
}
|
||
}
|
||
}
|
||
|
||
/// 扩展版查询构建 — 支持 Generated Column 路由
|
||
pub fn build_filtered_query_sql_ex(
|
||
table_name: &str,
|
||
tenant_id: Uuid,
|
||
limit: u64,
|
||
offset: u64,
|
||
filter: Option<serde_json::Value>,
|
||
search: Option<(String, String)>,
|
||
sort_by: Option<String>,
|
||
sort_order: Option<String>,
|
||
generated_fields: &[String],
|
||
) -> Result<(String, Vec<Value>), String> {
|
||
let ref_fn = Self::field_reference_fn(generated_fields);
|
||
|
||
let mut conditions = vec![
|
||
format!("\"tenant_id\" = ${}", 1),
|
||
"\"deleted_at\" IS NULL".to_string(),
|
||
];
|
||
let mut param_idx = 2;
|
||
let mut values: Vec<Value> = vec![tenant_id.into()];
|
||
|
||
// filter
|
||
if let Some(f) = filter {
|
||
if let Some(obj) = f.as_object() {
|
||
for (key, val) in obj {
|
||
let clean_key = sanitize_identifier(key);
|
||
if clean_key.is_empty() {
|
||
return Err(format!("无效的过滤字段名: {}", key));
|
||
}
|
||
conditions.push(format!("{} = ${}", ref_fn(&clean_key), param_idx));
|
||
values.push(Value::String(Some(Box::new(
|
||
val.as_str().unwrap_or("").to_string(),
|
||
))));
|
||
param_idx += 1;
|
||
}
|
||
}
|
||
}
|
||
|
||
// search
|
||
if let Some((fields_csv, keyword)) = search {
|
||
let escaped = keyword.replace('%', "\\%").replace('_', "\\_");
|
||
let fields: Vec<&str> = fields_csv.split(',').collect();
|
||
let search_param_idx = param_idx;
|
||
let search_conditions: Vec<String> = fields
|
||
.iter()
|
||
.map(|f| {
|
||
let clean = sanitize_identifier(f.trim());
|
||
format!("\"data\"->>'{}' ILIKE ${}", clean, search_param_idx)
|
||
})
|
||
.collect();
|
||
conditions.push(format!("({})", search_conditions.join(" OR ")));
|
||
values.push(Value::String(Some(Box::new(format!("%{}%", escaped)))));
|
||
param_idx += 1;
|
||
}
|
||
|
||
// sort
|
||
let order_clause = if let Some(sb) = sort_by {
|
||
let clean = sanitize_identifier(&sb);
|
||
if clean.is_empty() {
|
||
return Err(format!("无效的排序字段名: {}", sb));
|
||
}
|
||
let dir = match sort_order.as_deref() {
|
||
Some("asc") | Some("ASC") => "ASC",
|
||
_ => "DESC",
|
||
};
|
||
format!("ORDER BY {} {}", ref_fn(&clean), dir)
|
||
} else {
|
||
"ORDER BY \"created_at\" DESC".to_string()
|
||
};
|
||
|
||
let sql = format!(
|
||
"SELECT id, data, created_at, updated_at, version FROM \"{}\" WHERE {} {} LIMIT ${} OFFSET ${}",
|
||
table_name,
|
||
conditions.join(" AND "),
|
||
order_clause,
|
||
param_idx,
|
||
param_idx + 1,
|
||
);
|
||
values.push((limit as i64).into());
|
||
values.push((offset as i64).into());
|
||
|
||
Ok((sql, values))
|
||
}
|
||
|
||
/// 编码游标
|
||
pub fn encode_cursor(values: &[String], id: &Uuid) -> String {
|
||
let obj = serde_json::json!({
|
||
"v": values,
|
||
"id": id.to_string(),
|
||
});
|
||
BASE64.encode(obj.to_string())
|
||
}
|
||
|
||
/// 解码游标
|
||
pub fn decode_cursor(cursor: &str) -> Result<(Vec<String>, Uuid), String> {
|
||
let json_str = BASE64
|
||
.decode(cursor)
|
||
.map_err(|e| format!("游标 Base64 解码失败: {}", e))?;
|
||
let obj: serde_json::Value = serde_json::from_slice(&json_str)
|
||
.map_err(|e| format!("游标 JSON 解析失败: {}", e))?;
|
||
let values = obj["v"]
|
||
.as_array()
|
||
.ok_or("游标缺少 v 字段")?
|
||
.iter()
|
||
.map(|v| v.as_str().unwrap_or("").to_string())
|
||
.collect();
|
||
let id = obj["id"].as_str().ok_or("游标缺少 id 字段")?;
|
||
let id = Uuid::parse_str(id).map_err(|e| format!("游标 id 解析失败: {}", e))?;
|
||
Ok((values, id))
|
||
}
|
||
|
||
/// 构建 Keyset 分页 SQL
|
||
pub fn build_keyset_query_sql(
|
||
table_name: &str,
|
||
tenant_id: Uuid,
|
||
limit: u64,
|
||
cursor: Option<String>,
|
||
sort_column: Option<String>,
|
||
sort_direction: &str,
|
||
generated_fields: &[String],
|
||
) -> Result<(String, Vec<Value>), String> {
|
||
let dir = match sort_direction {
|
||
"ASC" => "ASC",
|
||
_ => "DESC",
|
||
};
|
||
let ref_fn = Self::field_reference_fn(generated_fields);
|
||
let sort_col = sort_column
|
||
.as_deref()
|
||
.map(|s| ref_fn(s))
|
||
.unwrap_or("\"created_at\"".to_string());
|
||
|
||
let mut values: Vec<Value> = vec![tenant_id.into()];
|
||
let mut param_idx = 2;
|
||
|
||
let cursor_condition = if let Some(c) = cursor {
|
||
let (sort_vals, cursor_id) = Self::decode_cursor(&c)?;
|
||
let cond = format!(
|
||
"ROW({}, \"id\") {} (${}, ${})",
|
||
sort_col,
|
||
if dir == "ASC" { ">" } else { "<" },
|
||
param_idx,
|
||
param_idx + 1
|
||
);
|
||
values.push(Value::String(Some(Box::new(
|
||
sort_vals.first().cloned().unwrap_or_default(),
|
||
))));
|
||
values.push(cursor_id.into());
|
||
param_idx += 2;
|
||
Some(cond)
|
||
} else {
|
||
None
|
||
};
|
||
|
||
let where_extra = cursor_condition
|
||
.map(|c| format!(" AND {}", c))
|
||
.unwrap_or_default();
|
||
|
||
let sql = format!(
|
||
"SELECT id, data, created_at, updated_at, version FROM \"{}\" \
|
||
WHERE \"tenant_id\" = $1 AND \"deleted_at\" IS NULL{} \
|
||
ORDER BY {}, \"id\" {} LIMIT ${}",
|
||
table_name, where_extra, sort_col, dir, param_idx,
|
||
);
|
||
values.push((limit as i64).into());
|
||
|
||
Ok((sql, values))
|
||
}
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::*;
|
||
|
||
#[test]
|
||
fn test_unique_index_sql_uses_create_unique_index() {
|
||
let sql = DynamicTableManager::build_unique_index_sql("plugin_test", "code");
|
||
assert!(
|
||
sql.contains("CREATE UNIQUE INDEX"),
|
||
"Expected UNIQUE index, got: {}",
|
||
sql
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn test_build_filtered_query_sql_with_filter() {
|
||
let (sql, values) = DynamicTableManager::build_filtered_query_sql(
|
||
"plugin_test_customer",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
20,
|
||
0,
|
||
Some(serde_json::json!({"customer_id": "abc-123"})),
|
||
None,
|
||
None,
|
||
None,
|
||
)
|
||
.unwrap();
|
||
assert!(
|
||
sql.contains("\"data\"->>'customer_id' ="),
|
||
"Expected filter in SQL, got: {}",
|
||
sql
|
||
);
|
||
assert!(sql.contains("tenant_id"), "Expected tenant_id filter");
|
||
// 验证参数值
|
||
assert_eq!(values.len(), 4); // tenant_id + filter_value + limit + offset
|
||
}
|
||
|
||
#[test]
|
||
fn test_build_filtered_query_sql_sanitizes_keys() {
|
||
let result = DynamicTableManager::build_filtered_query_sql(
|
||
"plugin_test",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
20,
|
||
0,
|
||
Some(serde_json::json!({"evil'; DROP TABLE--": "value"})),
|
||
None,
|
||
None,
|
||
None,
|
||
);
|
||
// 恶意 key 被清理为合法标识符
|
||
let (sql, _) = result.unwrap();
|
||
assert!(
|
||
!sql.contains("DROP TABLE"),
|
||
"SQL should not contain injection: {}",
|
||
sql
|
||
);
|
||
assert!(
|
||
sql.contains("evil___DROP_TABLE__"),
|
||
"Key should be sanitized: {}",
|
||
sql
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn test_build_filtered_query_sql_with_search() {
|
||
let (sql, values) = DynamicTableManager::build_filtered_query_sql(
|
||
"plugin_test",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
20,
|
||
0,
|
||
None,
|
||
Some(("name,code".to_string(), "测试关键词".to_string())),
|
||
None,
|
||
None,
|
||
)
|
||
.unwrap();
|
||
assert!(sql.contains("ILIKE"), "Expected ILIKE in SQL, got: {}", sql);
|
||
// 验证搜索参数值包含 %...%
|
||
if let Value::String(Some(s)) = &values[1] {
|
||
assert!(s.contains("测试关键词"), "Search value should contain keyword");
|
||
assert!(s.starts_with('%'), "Search value should start with %");
|
||
}
|
||
}
|
||
|
||
#[test]
|
||
fn test_build_filtered_query_sql_with_sort() {
|
||
let (sql, _) = DynamicTableManager::build_filtered_query_sql(
|
||
"plugin_test",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
20,
|
||
0,
|
||
None,
|
||
None,
|
||
Some("name".to_string()),
|
||
Some("asc".to_string()),
|
||
)
|
||
.unwrap();
|
||
assert!(
|
||
sql.contains("ORDER BY \"data\"->>'name' ASC"),
|
||
"Expected sort clause, got: {}",
|
||
sql
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn test_build_filtered_query_sql_default_sort() {
|
||
let (sql, _) = DynamicTableManager::build_filtered_query_sql(
|
||
"plugin_test",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
20,
|
||
0,
|
||
None,
|
||
None,
|
||
None,
|
||
None,
|
||
)
|
||
.unwrap();
|
||
assert!(
|
||
sql.contains("ORDER BY \"created_at\" DESC"),
|
||
"Expected default sort, got: {}",
|
||
sql
|
||
);
|
||
}
|
||
|
||
// ===== build_filtered_count_sql 测试 =====
|
||
|
||
#[test]
|
||
fn test_build_filtered_count_sql_basic() {
|
||
let (sql, values) = DynamicTableManager::build_filtered_count_sql(
|
||
"plugin_test",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
None,
|
||
None,
|
||
)
|
||
.unwrap();
|
||
assert!(sql.contains("COUNT(*)"), "Expected COUNT(*), got: {}", sql);
|
||
assert!(sql.contains("tenant_id"), "Expected tenant_id filter");
|
||
assert!(sql.contains("deleted_at"), "Expected soft delete filter");
|
||
assert_eq!(values.len(), 1); // 仅 tenant_id
|
||
}
|
||
|
||
#[test]
|
||
fn test_build_filtered_count_sql_with_filter() {
|
||
let (sql, values) = DynamicTableManager::build_filtered_count_sql(
|
||
"plugin_test_customer",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
Some(serde_json::json!({"status": "active"})),
|
||
None,
|
||
)
|
||
.unwrap();
|
||
assert!(sql.contains("\"data\"->>'status' ="), "Expected filter, got: {}", sql);
|
||
assert_eq!(values.len(), 2); // tenant_id + filter_value
|
||
}
|
||
|
||
#[test]
|
||
fn test_build_filtered_count_sql_with_search() {
|
||
let (sql, values) = DynamicTableManager::build_filtered_count_sql(
|
||
"plugin_test",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
None,
|
||
Some(("name,code".to_string(), "搜索词".to_string())),
|
||
)
|
||
.unwrap();
|
||
assert!(sql.contains("ILIKE"), "Expected ILIKE, got: {}", sql);
|
||
assert_eq!(values.len(), 2); // tenant_id + search_param
|
||
}
|
||
|
||
#[test]
|
||
fn test_build_filtered_count_sql_no_limit_offset() {
|
||
// COUNT SQL 不应包含 LIMIT/OFFSET
|
||
let (sql, _) = DynamicTableManager::build_filtered_count_sql(
|
||
"plugin_test",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
None,
|
||
None,
|
||
)
|
||
.unwrap();
|
||
assert!(!sql.contains("LIMIT"), "COUNT SQL 不应包含 LIMIT");
|
||
assert!(!sql.contains("OFFSET"), "COUNT SQL 不应包含 OFFSET");
|
||
}
|
||
|
||
// ===== build_aggregate_sql 测试 =====
|
||
|
||
#[test]
|
||
fn test_build_aggregate_sql_basic() {
|
||
let (sql, values) = DynamicTableManager::build_aggregate_sql(
|
||
"plugin_test_customer",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
"status",
|
||
None,
|
||
)
|
||
.unwrap();
|
||
assert!(sql.contains("GROUP BY"), "Expected GROUP BY, got: {}", sql);
|
||
assert!(sql.contains("\"data\"->>'status'"), "Expected group field, got: {}", sql);
|
||
assert!(sql.contains("ORDER BY count DESC"), "Expected ORDER BY count DESC, got: {}", sql);
|
||
assert_eq!(values.len(), 1); // 仅 tenant_id
|
||
}
|
||
|
||
#[test]
|
||
fn test_build_aggregate_sql_with_filter() {
|
||
let (sql, values) = DynamicTableManager::build_aggregate_sql(
|
||
"plugin_test_customer",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
"region",
|
||
Some(serde_json::json!({"status": "active"})),
|
||
)
|
||
.unwrap();
|
||
assert!(sql.contains("\"data\"->>'region'"), "Expected group field, got: {}", sql);
|
||
assert!(sql.contains("\"data\"->>'status' ="), "Expected filter, got: {}", sql);
|
||
assert_eq!(values.len(), 2); // tenant_id + filter_value
|
||
}
|
||
|
||
#[test]
|
||
fn test_build_aggregate_sql_sanitizes_group_field() {
|
||
let result = DynamicTableManager::build_aggregate_sql(
|
||
"plugin_test",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
"evil'; DROP TABLE--",
|
||
None,
|
||
);
|
||
let (sql, _) = result.unwrap();
|
||
assert!(!sql.contains("DROP TABLE"), "SQL 不应包含注入: {}", sql);
|
||
assert!(sql.contains("evil___DROP_TABLE__"), "字段名应被清理: {}", sql);
|
||
}
|
||
|
||
#[test]
|
||
fn test_build_aggregate_sql_empty_field_rejected() {
|
||
let result = DynamicTableManager::build_aggregate_sql(
|
||
"plugin_test",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
"",
|
||
None,
|
||
);
|
||
assert!(result.is_err(), "空字段名应被拒绝");
|
||
}
|
||
|
||
// ===== build_create_table_sql (Generated Column) 测试 =====
|
||
|
||
#[test]
|
||
fn test_build_create_table_sql_with_generated_columns() {
|
||
use crate::manifest::{PluginEntity, PluginField, PluginFieldType};
|
||
|
||
let entity = PluginEntity {
|
||
name: "customer".to_string(),
|
||
display_name: "客户".to_string(),
|
||
fields: vec![
|
||
PluginField {
|
||
name: "code".to_string(),
|
||
field_type: PluginFieldType::String,
|
||
required: true,
|
||
unique: true,
|
||
display_name: Some("编码".to_string()),
|
||
searchable: Some(true),
|
||
..PluginField::default_for_field()
|
||
},
|
||
PluginField {
|
||
name: "level".to_string(),
|
||
field_type: PluginFieldType::String,
|
||
filterable: Some(true),
|
||
display_name: Some("等级".to_string()),
|
||
..PluginField::default_for_field()
|
||
},
|
||
PluginField {
|
||
name: "sort_order".to_string(),
|
||
field_type: PluginFieldType::Integer,
|
||
sortable: Some(true),
|
||
display_name: Some("排序".to_string()),
|
||
..PluginField::default_for_field()
|
||
},
|
||
],
|
||
indexes: vec![],
|
||
};
|
||
|
||
let sql = DynamicTableManager::build_create_table_sql("erp_crm", &entity);
|
||
assert!(
|
||
sql.contains("_f_code"),
|
||
"应包含 _f_code Generated Column"
|
||
);
|
||
assert!(
|
||
sql.contains("_f_level"),
|
||
"应包含 _f_level Generated Column"
|
||
);
|
||
assert!(
|
||
sql.contains("_f_sort_order"),
|
||
"应包含 _f_sort_order Generated Column"
|
||
);
|
||
assert!(
|
||
sql.contains("GENERATED ALWAYS AS"),
|
||
"应包含 GENERATED ALWAYS AS"
|
||
);
|
||
assert!(
|
||
sql.contains("::INTEGER"),
|
||
"Integer 字段应有类型转换"
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn test_build_create_table_sql_pg_trgm_search_index() {
|
||
use crate::manifest::{PluginEntity, PluginField, PluginFieldType};
|
||
|
||
let entity = PluginEntity {
|
||
name: "customer".to_string(),
|
||
display_name: "客户".to_string(),
|
||
fields: vec![PluginField {
|
||
name: "name".to_string(),
|
||
field_type: PluginFieldType::String,
|
||
searchable: Some(true),
|
||
display_name: Some("名称".to_string()),
|
||
..PluginField::default_for_field()
|
||
}],
|
||
indexes: vec![],
|
||
};
|
||
|
||
let sql = DynamicTableManager::build_create_table_sql("erp_crm", &entity);
|
||
assert!(
|
||
sql.contains("gin_trgm_ops"),
|
||
"searchable 字段应使用 pg_trgm GIN 索引"
|
||
);
|
||
}
|
||
|
||
// ===== field_reference_fn + build_filtered_query_sql_ex 测试 =====
|
||
|
||
#[test]
|
||
fn test_field_reference_uses_generated_column() {
|
||
let generated_fields = vec![
|
||
"code".to_string(),
|
||
"status".to_string(),
|
||
"level".to_string(),
|
||
];
|
||
let ref_fn = DynamicTableManager::field_reference_fn(&generated_fields);
|
||
assert_eq!(ref_fn("code"), "\"_f_code\"");
|
||
assert_eq!(ref_fn("status"), "\"_f_status\"");
|
||
assert_eq!(ref_fn("name"), "\"data\"->>'name'");
|
||
assert_eq!(ref_fn("remark"), "\"data\"->>'remark'");
|
||
}
|
||
|
||
#[test]
|
||
fn test_filtered_query_uses_generated_column_for_sort() {
|
||
let generated_fields = vec!["code".to_string(), "level".to_string()];
|
||
let (sql, _) = DynamicTableManager::build_filtered_query_sql_ex(
|
||
"plugin_test",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
20,
|
||
0,
|
||
None,
|
||
None,
|
||
Some("level".to_string()),
|
||
Some("asc".to_string()),
|
||
&generated_fields,
|
||
)
|
||
.unwrap();
|
||
assert!(
|
||
sql.contains("ORDER BY \"_f_level\" ASC"),
|
||
"排序应使用 Generated Column,got: {}",
|
||
sql
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn test_filtered_query_uses_generated_column_for_filter() {
|
||
let generated_fields = vec!["status".to_string()];
|
||
let (sql, _) = DynamicTableManager::build_filtered_query_sql_ex(
|
||
"plugin_test",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
20,
|
||
0,
|
||
Some(serde_json::json!({"status": "active"})),
|
||
None,
|
||
None,
|
||
None,
|
||
&generated_fields,
|
||
)
|
||
.unwrap();
|
||
assert!(
|
||
sql.contains("\"_f_status\" = $"),
|
||
"过滤应使用 Generated Column,got: {}",
|
||
sql
|
||
);
|
||
}
|
||
|
||
// ===== Keyset Pagination 测试 =====
|
||
|
||
#[test]
|
||
fn test_keyset_cursor_encode_decode() {
|
||
let cursor = DynamicTableManager::encode_cursor(
|
||
&["测试值".to_string()],
|
||
&Uuid::parse_str("00000000-0000-0000-0000-000000000042").unwrap(),
|
||
);
|
||
let decoded = DynamicTableManager::decode_cursor(&cursor).unwrap();
|
||
assert_eq!(decoded.0, vec!["测试值"]);
|
||
assert_eq!(
|
||
decoded.1,
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000042").unwrap()
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn test_keyset_sql_first_page() {
|
||
let (sql, _) = DynamicTableManager::build_keyset_query_sql(
|
||
"plugin_test",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
20,
|
||
None,
|
||
Some("_f_name".to_string()),
|
||
"ASC",
|
||
&[],
|
||
)
|
||
.unwrap();
|
||
assert!(sql.contains("ORDER BY"), "应有 ORDER BY");
|
||
assert!(!sql.contains("ROW("), "第一页不应有 cursor 条件");
|
||
}
|
||
|
||
#[test]
|
||
fn test_keyset_sql_with_cursor() {
|
||
let cursor = DynamicTableManager::encode_cursor(
|
||
&["Alice".to_string()],
|
||
&Uuid::parse_str("00000000-0000-0000-0000-000000000100").unwrap(),
|
||
);
|
||
let (sql, values) = DynamicTableManager::build_keyset_query_sql(
|
||
"plugin_test",
|
||
Uuid::parse_str("00000000-0000-0000-0000-000000000001").unwrap(),
|
||
20,
|
||
Some(cursor),
|
||
Some("_f_name".to_string()),
|
||
"ASC",
|
||
&[],
|
||
)
|
||
.unwrap();
|
||
assert!(
|
||
sql.contains("ROW("),
|
||
"cursor 条件应使用 ROW 比较"
|
||
);
|
||
assert!(
|
||
values.len() >= 4,
|
||
"应有 tenant_id + cursor_val + cursor_id + limit"
|
||
);
|
||
}
|
||
}
|