feat(saas): Phase 4 — 配置迁移模块
- 配置项 CRUD (列表/详情/创建/更新/删除) - 配置分析端点 (按类别汇总, SaaS 托管统计) - 13 个默认配置项种子数据 (server/agent/memory/llm) - 配置同步协议 (客户端→SaaS, SaaS 优先策略) - 同步日志记录和查询 - 3 个新集成测试覆盖配置迁移端点
This commit is contained in:
272
crates/zclaw-saas/src/migration/service.rs
Normal file
272
crates/zclaw-saas/src/migration/service.rs
Normal file
@@ -0,0 +1,272 @@
|
||||
//! 配置迁移业务逻辑
|
||||
|
||||
use sqlx::SqlitePool;
|
||||
use crate::error::{SaasError, SaasResult};
|
||||
use super::types::*;
|
||||
|
||||
// ============ Config Items ============
|
||||
|
||||
pub async fn list_config_items(
|
||||
db: &SqlitePool, query: &ConfigQuery,
|
||||
) -> SaasResult<Vec<ConfigItemInfo>> {
|
||||
let sql = match (&query.category, &query.source) {
|
||||
(Some(_), Some(_)) => {
|
||||
"SELECT id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at
|
||||
FROM config_items WHERE category = ?1 AND source = ?2 ORDER BY category, key_path"
|
||||
}
|
||||
(Some(_), None) => {
|
||||
"SELECT id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at
|
||||
FROM config_items WHERE category = ?1 ORDER BY key_path"
|
||||
}
|
||||
(None, Some(_)) => {
|
||||
"SELECT id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at
|
||||
FROM config_items WHERE source = ?1 ORDER BY category, key_path"
|
||||
}
|
||||
(None, None) => {
|
||||
"SELECT id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at
|
||||
FROM config_items ORDER BY category, key_path"
|
||||
}
|
||||
};
|
||||
|
||||
let mut query_builder = sqlx::query_as::<_, (String, String, String, String, Option<String>, Option<String>, String, Option<String>, bool, String, String)>(sql);
|
||||
|
||||
if let Some(cat) = &query.category {
|
||||
query_builder = query_builder.bind(cat);
|
||||
}
|
||||
if let Some(src) = &query.source {
|
||||
query_builder = query_builder.bind(src);
|
||||
}
|
||||
|
||||
let rows = query_builder.fetch_all(db).await?;
|
||||
Ok(rows.into_iter().map(|(id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at)| {
|
||||
ConfigItemInfo { id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at }
|
||||
}).collect())
|
||||
}
|
||||
|
||||
pub async fn get_config_item(db: &SqlitePool, item_id: &str) -> SaasResult<ConfigItemInfo> {
|
||||
let row: Option<(String, String, String, String, Option<String>, Option<String>, String, Option<String>, bool, String, String)> =
|
||||
sqlx::query_as(
|
||||
"SELECT id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at
|
||||
FROM config_items WHERE id = ?1"
|
||||
)
|
||||
.bind(item_id)
|
||||
.fetch_optional(db)
|
||||
.await?;
|
||||
|
||||
let (id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at) =
|
||||
row.ok_or_else(|| SaasError::NotFound(format!("配置项 {} 不存在", item_id)))?;
|
||||
|
||||
Ok(ConfigItemInfo { id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at })
|
||||
}
|
||||
|
||||
pub async fn create_config_item(
|
||||
db: &SqlitePool, req: &CreateConfigItemRequest,
|
||||
) -> SaasResult<ConfigItemInfo> {
|
||||
let id = uuid::Uuid::new_v4().to_string();
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
let source = req.source.as_deref().unwrap_or("local");
|
||||
let requires_restart = req.requires_restart.unwrap_or(false);
|
||||
|
||||
// 检查唯一性
|
||||
let existing: Option<(String,)> = sqlx::query_as(
|
||||
"SELECT id FROM config_items WHERE category = ?1 AND key_path = ?2"
|
||||
)
|
||||
.bind(&req.category).bind(&req.key_path)
|
||||
.fetch_optional(db).await?;
|
||||
|
||||
if existing.is_some() {
|
||||
return Err(SaasError::AlreadyExists(format!(
|
||||
"配置项 {}:{} 已存在", req.category, req.key_path
|
||||
)));
|
||||
}
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO config_items (id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at)
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?10)"
|
||||
)
|
||||
.bind(&id).bind(&req.category).bind(&req.key_path).bind(&req.value_type)
|
||||
.bind(&req.current_value).bind(&req.default_value).bind(source)
|
||||
.bind(&req.description).bind(requires_restart).bind(&now)
|
||||
.execute(db).await?;
|
||||
|
||||
get_config_item(db, &id).await
|
||||
}
|
||||
|
||||
pub async fn update_config_item(
|
||||
db: &SqlitePool, item_id: &str, req: &UpdateConfigItemRequest,
|
||||
) -> SaasResult<ConfigItemInfo> {
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
let mut updates = Vec::new();
|
||||
let mut params: Vec<String> = Vec::new();
|
||||
|
||||
if let Some(ref v) = req.current_value { updates.push("current_value = ?"); params.push(v.clone()); }
|
||||
if let Some(ref v) = req.source { updates.push("source = ?"); params.push(v.clone()); }
|
||||
if let Some(ref v) = req.description { updates.push("description = ?"); params.push(v.clone()); }
|
||||
|
||||
if updates.is_empty() {
|
||||
return get_config_item(db, item_id).await;
|
||||
}
|
||||
|
||||
updates.push("updated_at = ?");
|
||||
params.push(now);
|
||||
params.push(item_id.to_string());
|
||||
|
||||
let sql = format!("UPDATE config_items SET {} WHERE id = ?", updates.join(", "));
|
||||
let mut query = sqlx::query(&sql);
|
||||
for p in ¶ms {
|
||||
query = query.bind(p);
|
||||
}
|
||||
query.execute(db).await?;
|
||||
|
||||
get_config_item(db, item_id).await
|
||||
}
|
||||
|
||||
pub async fn delete_config_item(db: &SqlitePool, item_id: &str) -> SaasResult<()> {
|
||||
let result = sqlx::query("DELETE FROM config_items WHERE id = ?1")
|
||||
.bind(item_id).execute(db).await?;
|
||||
if result.rows_affected() == 0 {
|
||||
return Err(SaasError::NotFound(format!("配置项 {} 不存在", item_id)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ============ Config Analysis ============
|
||||
|
||||
pub async fn analyze_config(db: &SqlitePool) -> SaasResult<ConfigAnalysis> {
|
||||
let items = list_config_items(db, &ConfigQuery { category: None, source: None }).await?;
|
||||
|
||||
let mut categories: std::collections::HashMap<String, (i64, i64)> = std::collections::HashMap::new();
|
||||
for item in &items {
|
||||
let entry = categories.entry(item.category.clone()).or_insert((0, 0));
|
||||
entry.0 += 1;
|
||||
if item.source == "saas" {
|
||||
entry.1 += 1;
|
||||
}
|
||||
}
|
||||
|
||||
let category_summaries: Vec<CategorySummary> = categories.into_iter()
|
||||
.map(|(category, (count, saas_managed))| CategorySummary { category, count, saas_managed })
|
||||
.collect();
|
||||
|
||||
Ok(ConfigAnalysis {
|
||||
total_items: items.len() as i64,
|
||||
categories: category_summaries,
|
||||
items,
|
||||
})
|
||||
}
|
||||
|
||||
/// 种子默认配置项
|
||||
pub async fn seed_default_config_items(db: &SqlitePool) -> SaasResult<usize> {
|
||||
let defaults = [
|
||||
("server", "server.host", "string", Some("127.0.0.1"), Some("127.0.0.1"), "服务器监听地址"),
|
||||
("server", "server.port", "integer", Some("4200"), Some("4200"), "服务器端口"),
|
||||
("server", "server.cors_origins", "array", None, None, "CORS 允许的源"),
|
||||
("agent", "agent.defaults.default_model", "string", Some("zhipu/glm-4-plus"), Some("zhipu/glm-4-plus"), "默认模型"),
|
||||
("agent", "agent.defaults.fallback_models", "array", None, None, "回退模型列表"),
|
||||
("agent", "agent.defaults.max_sessions", "integer", Some("10"), Some("10"), "最大并发会话数"),
|
||||
("agent", "agent.defaults.heartbeat_interval", "duration", Some("1h"), Some("1h"), "心跳间隔"),
|
||||
("agent", "agent.defaults.session_timeout", "duration", Some("24h"), Some("24h"), "会话超时"),
|
||||
("memory", "agent.defaults.memory.max_history_length", "integer", Some("100"), Some("100"), "最大历史长度"),
|
||||
("memory", "agent.defaults.memory.summarize_threshold", "integer", Some("50"), Some("50"), "摘要阈值"),
|
||||
("llm", "llm.default_provider", "string", Some("zhipu"), Some("zhipu"), "默认 LLM Provider"),
|
||||
("llm", "llm.temperature", "float", Some("0.7"), Some("0.7"), "默认温度"),
|
||||
("llm", "llm.max_tokens", "integer", Some("4096"), Some("4096"), "默认最大 token 数"),
|
||||
];
|
||||
|
||||
let mut created = 0;
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
|
||||
for (category, key_path, value_type, default_value, current_value, description) in defaults {
|
||||
let existing: Option<(String,)> = sqlx::query_as(
|
||||
"SELECT id FROM config_items WHERE category = ?1 AND key_path = ?2"
|
||||
)
|
||||
.bind(category).bind(key_path)
|
||||
.fetch_optional(db)
|
||||
.await?;
|
||||
|
||||
if existing.is_none() {
|
||||
let id = uuid::Uuid::new_v4().to_string();
|
||||
sqlx::query(
|
||||
"INSERT INTO config_items (id, category, key_path, value_type, current_value, default_value, source, description, requires_restart, created_at, updated_at)
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, 'local', ?7, 0, ?8, ?8)"
|
||||
)
|
||||
.bind(&id).bind(category).bind(key_path).bind(value_type)
|
||||
.bind(current_value).bind(default_value).bind(description).bind(&now)
|
||||
.execute(db)
|
||||
.await?;
|
||||
created += 1;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(created)
|
||||
}
|
||||
|
||||
// ============ Config Sync ============
|
||||
|
||||
pub async fn sync_config(
|
||||
db: &SqlitePool, account_id: &str, req: &SyncConfigRequest,
|
||||
) -> SaasResult<Vec<ConfigSyncLogInfo>> {
|
||||
let now = chrono::Utc::now().to_rfc3339();
|
||||
let config_keys_str = serde_json::to_string(&req.config_keys)?;
|
||||
let client_values_str = Some(serde_json::to_string(&req.client_values)?);
|
||||
|
||||
// 获取 SaaS 端的配置值
|
||||
let saas_items = list_config_items(db, &ConfigQuery { category: None, source: None }).await?;
|
||||
let saas_values: serde_json::Value = saas_items.iter()
|
||||
.filter(|item| req.config_keys.contains(&item.key_path))
|
||||
.map(|item| {
|
||||
let key = format!("{}.{}", item.category, item.key_path);
|
||||
(key, serde_json::json!({
|
||||
"value": item.current_value,
|
||||
"source": item.source,
|
||||
}))
|
||||
})
|
||||
.collect();
|
||||
|
||||
let saas_values_str = Some(serde_json::to_string(&saas_values)?);
|
||||
|
||||
let resolution = "saas_wins".to_string(); // SaaS 配置优先
|
||||
|
||||
let id = sqlx::query(
|
||||
"INSERT INTO config_sync_log (account_id, client_fingerprint, action, config_keys, client_values, saas_values, resolution, created_at)
|
||||
VALUES (?1, ?2, 'sync', ?3, ?4, ?5, ?6, ?7)"
|
||||
)
|
||||
.bind(account_id).bind(&req.client_fingerprint)
|
||||
.bind(&config_keys_str).bind(&client_values_str)
|
||||
.bind(&saas_values_str).bind(&resolution).bind(&now)
|
||||
.execute(db)
|
||||
.await?;
|
||||
|
||||
let log_id = id.last_insert_rowid();
|
||||
|
||||
// 返回同步结果
|
||||
let row: Option<(i64, String, String, String, String, Option<String>, Option<String>, Option<String>, String)> =
|
||||
sqlx::query_as(
|
||||
"SELECT id, account_id, client_fingerprint, action, config_keys, client_values, saas_values, resolution, created_at
|
||||
FROM config_sync_log WHERE id = ?1"
|
||||
)
|
||||
.bind(log_id)
|
||||
.fetch_optional(db)
|
||||
.await?;
|
||||
|
||||
Ok(row.into_iter().map(|(id, account_id, client_fingerprint, action, config_keys, client_values, saas_values, resolution, created_at)| {
|
||||
ConfigSyncLogInfo { id, account_id, client_fingerprint, action, config_keys, client_values, saas_values, resolution, created_at }
|
||||
}).collect())
|
||||
}
|
||||
|
||||
pub async fn list_sync_logs(
|
||||
db: &SqlitePool, account_id: &str,
|
||||
) -> SaasResult<Vec<ConfigSyncLogInfo>> {
|
||||
let rows: Vec<(i64, String, String, String, String, Option<String>, Option<String>, Option<String>, String)> =
|
||||
sqlx::query_as(
|
||||
"SELECT id, account_id, client_fingerprint, action, config_keys, client_values, saas_values, resolution, created_at
|
||||
FROM config_sync_log WHERE account_id = ?1 ORDER BY created_at DESC LIMIT 50"
|
||||
)
|
||||
.bind(account_id)
|
||||
.fetch_all(db)
|
||||
.await?;
|
||||
|
||||
Ok(rows.into_iter().map(|(id, account_id, client_fingerprint, action, config_keys, client_values, saas_values, resolution, created_at)| {
|
||||
ConfigSyncLogInfo { id, account_id, client_fingerprint, action, config_keys, client_values, saas_values, resolution, created_at }
|
||||
}).collect())
|
||||
}
|
||||
Reference in New Issue
Block a user