Files
zclaw_openfang/crates/zclaw-saas/src/db.rs
iven ef60f9a183 feat(saas): add knowledge base module — categories, items, versions, search, analytics
- 5 knowledge tables (categories, items, chunks, versions, usage) with pgvector + HNSW + GIN indexes
- 23+ API routes covering full CRUD, tree-structured categories, version snapshots
- Keyword-based search with ILIKE + array match (placeholder for vector search)
- Analytics endpoints: overview, trends, top-items, quality, gaps
- Markdown-aware content chunking with overlap strategy
- Worker dispatch for async embedding generation

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-02 00:21:28 +08:00

952 lines
50 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! 数据库初始化与 Schema (PostgreSQL)
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;
use crate::config::DatabaseConfig;
use crate::error::SaasResult;
const SCHEMA_VERSION: i32 = 13;
/// 初始化数据库
pub async fn init_db(config: &DatabaseConfig) -> SaasResult<PgPool> {
// 环境变量覆盖 URL避免在配置文件中存储密码
let database_url = std::env::var("ZCLAW_DATABASE_URL")
.unwrap_or_else(|_| config.url.clone());
// 环境变量覆盖连接数(向后兼容)
let max_connections: u32 = std::env::var("ZCLAW_DB_MAX_CONNECTIONS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(config.max_connections);
let min_connections: u32 = std::env::var("ZCLAW_DB_MIN_CONNECTIONS")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(config.min_connections);
tracing::info!(
"Database pool: max={}, min={}, acquire_timeout={}s, idle_timeout={}s, max_lifetime={}s",
max_connections, min_connections,
config.acquire_timeout_secs, config.idle_timeout_secs, config.max_lifetime_secs
);
let pool = PgPoolOptions::new()
.max_connections(max_connections)
.min_connections(min_connections)
.acquire_timeout(std::time::Duration::from_secs(config.acquire_timeout_secs))
.idle_timeout(std::time::Duration::from_secs(config.idle_timeout_secs))
.max_lifetime(std::time::Duration::from_secs(config.max_lifetime_secs))
.connect(&database_url)
.await?;
run_migrations(&pool).await?;
ensure_security_columns(&pool).await?;
seed_admin_account(&pool).await?;
seed_builtin_prompts(&pool).await?;
seed_demo_data(&pool).await?;
fix_seed_data(&pool).await?;
tracing::info!("Database initialized (schema v{})", SCHEMA_VERSION);
Ok(pool)
}
/// 执行数据库迁移
///
/// 优先使用 migrations/ 目录下的 SQL 文件(支持 TIMESTAMPTZ
/// 如果不存在则回退到内联 schema向后兼容 TEXT 时间戳的旧数据库)。
async fn run_migrations(pool: &PgPool) -> SaasResult<()> {
// 检查是否已有 schema已有的数据库保持 TEXT 类型不变)
let existing_version: Option<i32> = sqlx::query_scalar(
"SELECT version FROM saas_schema_version ORDER BY version DESC LIMIT 1"
)
.fetch_optional(pool)
.await
.unwrap_or(None);
match existing_version {
Some(v) if v >= SCHEMA_VERSION => {
tracing::debug!("Schema already at v{}, no migration needed", v);
return Ok(());
}
Some(v) => {
tracing::info!("Schema at v{}, upgrading to v{}", v, SCHEMA_VERSION);
}
None => {
tracing::info!("No schema found, running initial migration");
}
}
// 尝试从 migrations 目录加载 SQL 文件
let migrations_dir = std::path::Path::new("crates/zclaw-saas/migrations");
if migrations_dir.exists() {
run_migration_files(pool, migrations_dir).await?;
} else {
// 回退:使用 migrations/ 的替代路径(开发环境可能在项目根目录)
let alt_dir = std::path::Path::new("migrations");
if alt_dir.exists() {
run_migration_files(pool, alt_dir).await?;
} else {
tracing::warn!("No migrations directory found, schema may be incomplete");
}
}
// 更新 schema 版本
sqlx::query("INSERT INTO saas_schema_version (version) VALUES ($1) ON CONFLICT DO NOTHING")
.bind(SCHEMA_VERSION)
.execute(pool)
.await?;
// Seed roles
seed_roles(pool).await?;
Ok(())
}
/// 从目录加载并执行迁移文件(按文件名排序)
async fn run_migration_files(pool: &PgPool, dir: &std::path::Path) -> SaasResult<()> {
let mut entries: Vec<std::path::PathBuf> = std::fs::read_dir(dir)?
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| p.extension().map(|ext| ext == "sql").unwrap_or(false))
.collect();
entries.sort();
for path in &entries {
let filename = path.file_name().unwrap_or_default().to_string_lossy();
tracing::info!("Running migration: {}", filename);
let content = std::fs::read_to_string(path)?;
for stmt in split_sql_statements(&content) {
let trimmed = stmt.trim();
if !trimmed.is_empty() && !trimmed.starts_with("--") {
sqlx::query(trimmed).execute(pool).await?;
}
}
}
Ok(())
}
/// 按语句分割 SQL 文件内容,正确处理:
/// - 单引号字符串 `'...'`
/// - 双引号标识符 `"..."`
/// - 美元符号引用字符串 `$$...$$` 和 `$tag$...$tag$`
/// - `--` 单行注释
/// - `/* ... */` 块注释
/// - `E'...'` 转义字符串
fn split_sql_statements(sql: &str) -> Vec<String> {
let mut statements = Vec::new();
let mut current = String::new();
let mut chars = sql.chars().peekable();
while let Some(ch) = chars.next() {
match ch {
'\'' => {
// 单引号字符串
current.push(ch);
loop {
match chars.next() {
Some('\'') => {
current.push('\'');
// 检查是否为转义引号 ''
if chars.peek() == Some(&'\'') {
current.push(chars.next().unwrap());
} else {
break;
}
}
Some(c) => current.push(c),
None => break,
}
}
}
'"' => {
// 双引号标识符
current.push(ch);
loop {
match chars.next() {
Some('"') => {
current.push('"');
break;
}
Some(c) => current.push(c),
None => break,
}
}
}
'-' if chars.peek() == Some(&'-') => {
// 单行注释: 跳过直到行尾
chars.next(); // consume second '-'
while let Some(&c) = chars.peek() {
if c == '\n' {
chars.next();
current.push(c);
break;
}
chars.next();
}
}
'/' if chars.peek() == Some(&'*') => {
// 块注释: 跳过直到 */
chars.next(); // consume '*'
current.push_str("/*");
let mut prev = ' ';
loop {
match chars.next() {
Some('/') if prev == '*' => {
current.push('/');
break;
}
Some(c) => {
current.push(c);
prev = c;
}
None => break,
}
}
}
'$' => {
// 美元符号引用: $$ 或 $tag$ ... $tag$
current.push(ch);
// 读取 tag (字母数字和下划线)
let mut tag = String::new();
while let Some(&c) = chars.peek() {
if c == '$' || c.is_alphanumeric() || c == '_' {
if c == '$' {
chars.next();
current.push(c);
break;
}
chars.next();
tag.push(c);
current.push(c);
} else {
break;
}
}
// 如果 tag 为空,就是 $$ 格式
let end_marker = if tag.is_empty() {
"$$".to_string()
} else {
format!("${}$", tag)
};
// 读取直到遇到 end_marker
let mut buf = String::new();
loop {
match chars.next() {
Some(c) => {
current.push(c);
buf.push(c);
if buf.len() > end_marker.len() {
buf.remove(0);
}
if buf == end_marker {
break;
}
}
None => break,
}
}
}
';' => {
// 语句结束
let trimmed = current.trim().to_string();
if !trimmed.is_empty() {
statements.push(trimmed);
}
current.clear();
}
_ => {
current.push(ch);
}
}
}
// 最后一条语句 (可能不以分号结尾)
let trimmed = current.trim().to_string();
if !trimmed.is_empty() {
statements.push(trimmed);
}
statements
}
/// Seed 角色数据
async fn seed_roles(pool: &PgPool) -> SaasResult<()> {
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
r#"INSERT INTO roles (id, name, description, permissions, is_system, created_at, updated_at)
VALUES
('super_admin', '超级管理员', '拥有所有权限', '["admin:full","account:admin","provider:manage","model:manage","relay:admin","config:write","prompt:read","prompt:write","prompt:publish","prompt:admin"]', TRUE, $1, $1),
('admin', '管理员', '管理账号和配置', '["account:read","account:admin","provider:manage","model:read","model:manage","relay:use","relay:admin","config:read","config:write","prompt:read","prompt:write","prompt:publish"]', TRUE, $1, $1),
('user', '普通用户', '基础使用权限', '["model:read","relay:use","config:read","prompt:read"]', TRUE, $1, $1)
ON CONFLICT (id) DO NOTHING"#
)
.bind(&now)
.execute(pool)
.await?;
Ok(())
}
/// 如果 accounts 表为空且环境变量已设置,自动创建 super_admin 账号
/// 或者更新现有 admin 用户的角色为 super_admin
pub async fn seed_admin_account(pool: &PgPool) -> SaasResult<()> {
let admin_username = std::env::var("ZCLAW_ADMIN_USERNAME")
.unwrap_or_else(|_| "admin".to_string());
// 检查是否设置了管理员密码
let admin_password = match std::env::var("ZCLAW_ADMIN_PASSWORD") {
Ok(pwd) => pwd,
Err(_) => {
// 没有设置密码,尝试更新现有 admin 用户的角色
let result = sqlx::query(
"UPDATE accounts SET role = 'super_admin' WHERE username = $1 AND role != 'super_admin'"
)
.bind(&admin_username)
.execute(pool)
.await?;
if result.rows_affected() > 0 {
tracing::info!("已将用户 {} 的角色更新为 super_admin", admin_username);
}
return Ok(());
}
};
// 检查 admin 用户是否已存在
let existing: Option<(String,)> = sqlx::query_as(
"SELECT id FROM accounts WHERE username = $1"
)
.bind(&admin_username)
.fetch_optional(pool)
.await?;
if let Some((account_id,)) = existing {
// 更新现有用户的密码和角色(使用 spawn_blocking 避免阻塞 tokio 运行时)
let password_hash = crate::auth::password::hash_password_async(admin_password.clone()).await?;
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"UPDATE accounts SET password_hash = $1, role = 'super_admin', updated_at = $2 WHERE id = $3"
)
.bind(&password_hash)
.bind(&now)
.bind(&account_id)
.execute(pool)
.await?;
tracing::info!("已更新用户 {} 的密码和角色为 super_admin", admin_username);
} else {
// 创建新的 super_admin 账号
let password_hash = crate::auth::password::hash_password_async(admin_password.clone()).await?;
let account_id = uuid::Uuid::new_v4().to_string();
let email = format!("{}@zclaw.local", admin_username);
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
"INSERT INTO accounts (id, username, email, password_hash, display_name, role, status, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, 'super_admin', 'active', $6, $6)"
)
.bind(&account_id)
.bind(&admin_username)
.bind(&email)
.bind(&password_hash)
.bind(&admin_username)
.bind(&now)
.execute(pool)
.await?;
tracing::info!("自动创建 super_admin 账号: username={}, email={}", admin_username, email);
}
Ok(())
}
/// 种子化内置提示词模板(仅当表为空时)
async fn seed_builtin_prompts(pool: &PgPool) -> SaasResult<()> {
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM prompt_templates")
.fetch_one(pool).await?;
if count.0 > 0 {
return Ok(());
}
let now = chrono::Utc::now().to_rfc3339();
// reflection 提示词
let reflection_id = uuid::Uuid::new_v4().to_string();
let reflection_ver_id = uuid::Uuid::new_v4().to_string();
sqlx::query(
"INSERT INTO prompt_templates (id, name, category, description, source, current_version, status, created_at, updated_at)
VALUES ($1, 'reflection', 'builtin_system', 'Agent 自我反思引擎', 'builtin', 1, 'active', $2, $2)"
).bind(&reflection_id).bind(&now).execute(pool).await?;
sqlx::query(
"INSERT INTO prompt_versions (id, template_id, version, system_prompt, user_prompt_template, variables, changelog, min_app_version, created_at)
VALUES ($1, $2, 1, $3, $4, '[]', '初始版本', NULL, $5)"
).bind(&reflection_ver_id).bind(&reflection_id)
.bind("你是一个 AI Agent 的自我反思引擎。分析最近的对话历史,识别行为模式,并生成改进建议。\n\n输出 JSON 格式:\n{\n \"patterns\": [\n {\n \"observation\": \"观察到的模式描述\",\n \"frequency\": 数字,\n \"sentiment\": \"positive/negative/neutral\",\n \"evidence\": [\"证据1\", \"证据2\"]\n }\n ],\n \"improvements\": [\n {\n \"area\": \"改进领域\",\n \"suggestion\": \"具体建议\",\n \"priority\": \"high/medium/low\"\n }\n ],\n \"identityProposals\": []\n}")
.bind("分析以下对话历史,进行自我反思:\n\n{{context}}\n\n请识别行为模式(积极和消极),并提供具体的改进建议。")
.bind(&now).execute(pool).await?;
// compaction 提示词
let compaction_id = uuid::Uuid::new_v4().to_string();
let compaction_ver_id = uuid::Uuid::new_v4().to_string();
sqlx::query(
"INSERT INTO prompt_templates (id, name, category, description, source, current_version, status, created_at, updated_at)
VALUES ($1, 'compaction', 'builtin_compaction', '对话上下文压缩', 'builtin', 1, 'active', $2, $2)"
).bind(&compaction_id).bind(&now).execute(pool).await?;
sqlx::query(
"INSERT INTO prompt_versions (id, template_id, version, system_prompt, user_prompt_template, variables, changelog, min_app_version, created_at)
VALUES ($1, $2, 1, $3, $4, '[]', '初始版本', NULL, $5)"
).bind(&compaction_ver_id).bind(&compaction_id)
.bind("你是一个对话摘要专家。将长对话压缩为简洁的摘要,保留关键信息。\n\n要求:\n1. 保留所有重要决策和结论\n2. 保留用户偏好和约束\n3. 保留未完成的任务\n4. 保持时间顺序\n5. 摘要应能在后续对话中替代原始内容")
.bind("请将以下对话压缩为简洁摘要,保留关键信息:\n\n{{messages}}")
.bind(&now).execute(pool).await?;
// extraction 提示词
let extraction_id = uuid::Uuid::new_v4().to_string();
let extraction_ver_id = uuid::Uuid::new_v4().to_string();
sqlx::query(
"INSERT INTO prompt_templates (id, name, category, description, source, current_version, status, created_at, updated_at)
VALUES ($1, 'extraction', 'builtin_extraction', '记忆提取引擎', 'builtin', 1, 'active', $2, $2)"
).bind(&extraction_id).bind(&now).execute(pool).await?;
sqlx::query(
"INSERT INTO prompt_versions (id, template_id, version, system_prompt, user_prompt_template, variables, changelog, min_app_version, created_at)
VALUES ($1, $2, 1, $3, $4, '[]', '初始版本', NULL, $5)"
).bind(&extraction_ver_id).bind(&extraction_id)
.bind("你是一个记忆提取专家。从对话中提取值得长期记住的信息。\n\n提取类型:\n- fact: 用户告知的事实(如\"我的公司叫XXX\"\n- preference: 用户的偏好(如\"我喜欢简洁的回答\"\n- lesson: 本次对话的经验教训\n- task: 未完成的任务或承诺\n\n输出 JSON 数组:\n[\n {\n \"content\": \"记忆内容\",\n \"type\": \"fact/preference/lesson/task\",\n \"importance\": 1-10,\n \"tags\": [\"标签1\", \"标签2\"]\n }\n]")
.bind("从以下对话中提取值得长期记住的信息:\n\n{{conversation}}\n\n如果没有值得记忆的内容,返回空数组 []。")
.bind(&now).execute(pool).await?;
tracing::info!("Seeded 3 builtin prompt templates (reflection, compaction, extraction)");
Ok(())
}
/// 种子化演示数据 (Admin UI 演示用,幂等: ON CONFLICT DO NOTHING)
async fn seed_demo_data(pool: &PgPool) -> SaasResult<()> {
// 只在 providers 为空时 seed避免重复插入
let count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM providers")
.fetch_one(pool).await?;
if count.0 > 0 {
tracing::debug!("Demo data already exists, skipping seed");
return Ok(());
}
tracing::info!("Seeding demo data for Admin UI...");
// 获取 admin account id
let admin: Option<(String,)> = sqlx::query_as(
"SELECT id FROM accounts WHERE role = 'super_admin' LIMIT 1"
).fetch_optional(pool).await?;
let admin_id = admin.map(|(id,)| id).unwrap_or_else(|| "demo-admin".to_string());
let now = chrono::Utc::now();
// ===== 1. Providers =====
let providers = [
("demo-openai", "openai", "OpenAI", "https://api.openai.com/v1", true, 60, 100000),
("demo-anthropic", "anthropic", "Anthropic", "https://api.anthropic.com/v1", true, 50, 80000),
("demo-google", "google", "Google AI", "https://generativelanguage.googleapis.com/v1beta", true, 30, 60000),
("demo-deepseek", "deepseek", "DeepSeek", "https://api.deepseek.com/v1", true, 30, 50000),
("demo-local", "local-ollama", "本地 Ollama", "http://localhost:11434/v1", false, 10, 20000),
];
for (id, name, display, url, enabled, rpm, tpm) in &providers {
let ts = now.to_rfc3339();
sqlx::query(
"INSERT INTO providers (id, name, display_name, base_url, api_protocol, enabled, rate_limit_rpm, rate_limit_tpm, created_at, updated_at)
VALUES ($1, $2, $3, $4, 'openai', $5, $6, $7, $8, $8) ON CONFLICT (id) DO NOTHING"
).bind(id).bind(name).bind(display).bind(url).bind(*enabled).bind(*rpm as i64).bind(*tpm as i64).bind(&ts)
.execute(pool).await?;
}
// ===== 2. Models =====
let models = [
// OpenAI models
("demo-gpt4o", "demo-openai", "gpt-4o", "GPT-4o", 128000, 16384, true, true, 0.005, 0.015),
("demo-gpt4o-mini", "demo-openai", "gpt-4o-mini", "GPT-4o Mini", 128000, 16384, true, false, 0.00015, 0.0006),
("demo-gpt4-turbo", "demo-openai", "gpt-4-turbo", "GPT-4 Turbo", 128000, 4096, true, true, 0.01, 0.03),
("demo-o1", "demo-openai", "o1", "o1", 200000, 100000, true, true, 0.015, 0.06),
("demo-o3-mini", "demo-openai", "o3-mini", "o3-mini", 200000, 65536, true, false, 0.0011, 0.0044),
// Anthropic models
("demo-claude-sonnet", "demo-anthropic", "claude-sonnet-4-20250514", "Claude Sonnet 4", 200000, 64000, true, true, 0.003, 0.015),
("demo-claude-haiku", "demo-anthropic", "claude-haiku-4-20250414", "Claude Haiku 4", 200000, 8192, true, true, 0.0008, 0.004),
("demo-claude-opus", "demo-anthropic", "claude-opus-4-20250115", "Claude Opus 4", 200000, 32000, true, true, 0.015, 0.075),
// Google models
("demo-gemini-pro", "demo-google", "gemini-2.5-pro", "Gemini 2.5 Pro", 1048576, 65536, true, true, 0.00125, 0.005),
("demo-gemini-flash", "demo-google", "gemini-2.5-flash", "Gemini 2.5 Flash", 1048576, 65536, true, true, 0.000075, 0.0003),
// DeepSeek models
("demo-deepseek-chat", "demo-deepseek", "deepseek-chat", "DeepSeek Chat", 65536, 8192, true, false, 0.00014, 0.00028),
("demo-deepseek-reasoner", "demo-deepseek", "deepseek-reasoner", "DeepSeek R1", 65536, 8192, true, false, 0.00055, 0.00219),
];
for (id, pid, mid, alias, ctx, max_out, stream, vision, price_in, price_out) in &models {
let ts = now.to_rfc3339();
sqlx::query(
"INSERT INTO models (id, provider_id, model_id, alias, context_window, max_output_tokens, supports_streaming, supports_vision, enabled, pricing_input, pricing_output, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, true, $9, $10, $11, $11) ON CONFLICT (id) DO NOTHING"
).bind(id).bind(pid).bind(mid).bind(alias)
.bind(*ctx as i64).bind(*max_out as i64).bind(*stream).bind(*vision)
.bind(*price_in).bind(*price_out).bind(&ts)
.execute(pool).await?;
}
// ===== 3. Provider Keys (Key Pool) =====
let provider_keys = [
("demo-key-o1", "demo-openai", "OpenAI Key 1", "sk-demo-openai-key-1-xxxxx", 0, 60, 100000),
("demo-key-o2", "demo-openai", "OpenAI Key 2", "sk-demo-openai-key-2-xxxxx", 1, 40, 80000),
("demo-key-a1", "demo-anthropic", "Anthropic Key 1", "sk-ant-demo-key-1-xxxxx", 0, 50, 80000),
("demo-key-g1", "demo-google", "Google Key 1", "AIzaSyDemoKey1xxxxx", 0, 30, 60000),
("demo-key-d1", "demo-deepseek", "DeepSeek Key 1", "sk-demo-deepseek-key-1-xxxxx", 0, 30, 50000),
];
for (id, pid, label, kv, priority, rpm, tpm) in &provider_keys {
let ts = now.to_rfc3339();
sqlx::query(
"INSERT INTO provider_keys (id, provider_id, key_label, key_value, priority, max_rpm, max_tpm, is_active, total_requests, total_tokens, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, true, 0, 0, $8, $8) ON CONFLICT (id) DO NOTHING"
).bind(id).bind(pid).bind(label).bind(kv).bind(*priority as i32)
.bind(*rpm as i64).bind(*tpm as i64).bind(&ts)
.execute(pool).await?;
}
// ===== 4. Usage Records (past 30 days) =====
let models_for_usage = [
("demo-openai", "gpt-4o"),
("demo-openai", "gpt-4o-mini"),
("demo-anthropic", "claude-sonnet-4-20250514"),
("demo-google", "gemini-2.5-flash"),
("demo-deepseek", "deepseek-chat"),
];
let mut rng_seed = 42u64;
for day_offset in 0..30 {
let day = now - chrono::Duration::days(29 - day_offset);
// 每天 20~80 条 usage
let daily_count = 20 + (rng_seed % 60) as i32;
rng_seed = rng_seed.wrapping_mul(6364136223846793005).wrapping_add(1);
for i in 0..daily_count {
let (provider_id, model_id) = models_for_usage[(rng_seed as usize) % models_for_usage.len()];
rng_seed = rng_seed.wrapping_mul(6364136223846793005).wrapping_add(1);
let hour = rng_seed as i32 % 24;
rng_seed = rng_seed.wrapping_mul(6364136223846793005).wrapping_add(1);
let ts = (day + chrono::Duration::hours(hour as i64) + chrono::Duration::minutes(i as i64)).to_rfc3339();
let input = (500 + (rng_seed % 8000)) as i32;
rng_seed = rng_seed.wrapping_mul(6364136223846793005).wrapping_add(1);
let output = (200 + (rng_seed % 4000)) as i32;
rng_seed = rng_seed.wrapping_mul(6364136223846793005).wrapping_add(1);
let latency = (100 + (rng_seed % 3000)) as i32;
rng_seed = rng_seed.wrapping_mul(6364136223846793005).wrapping_add(1);
let status = if rng_seed % 20 == 0 { "failed" } else { "success" };
rng_seed = rng_seed.wrapping_mul(6364136223846793005).wrapping_add(1);
sqlx::query(
"INSERT INTO usage_records (account_id, provider_id, model_id, input_tokens, output_tokens, latency_ms, status, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)"
).bind(&admin_id).bind(provider_id).bind(model_id)
.bind(input).bind(output).bind(latency).bind(status).bind(&ts)
.execute(pool).await?;
}
}
// ===== 5. Relay Tasks (recent) =====
let relay_statuses = ["completed", "completed", "completed", "completed", "failed", "completed", "queued"];
for i in 0..20 {
let (provider_id, model_id) = models_for_usage[i % models_for_usage.len()];
let status = relay_statuses[i % relay_statuses.len()];
let offset_hours = (20 - i) as i64;
let ts = (now - chrono::Duration::hours(offset_hours)).to_rfc3339();
let ts_completed = (now - chrono::Duration::hours(offset_hours) + chrono::Duration::seconds(3)).to_rfc3339();
let task_id = uuid::Uuid::new_v4().to_string();
let hash = format!("{:064x}", i);
let body = format!(r#"{{"model":"{}","messages":[{{"role":"user","content":"demo request {}"}}]}}"#, model_id, i);
let (in_tok, out_tok, err) = if status == "completed" {
(1500 + i as i32 * 100, 800 + i as i32 * 50, None::<String>)
} else if status == "failed" {
(0, 0, Some("Connection timeout".to_string()))
} else {
(0, 0, None)
};
sqlx::query(
"INSERT INTO relay_tasks (id, account_id, provider_id, model_id, request_hash, status, priority, attempt_count, max_attempts, request_body, input_tokens, output_tokens, error_message, queued_at, started_at, completed_at, created_at)
VALUES ($1, $2, $3, $4, $5, $6, 0, 1, 3, $7, $8, $9, $10, $11, $12, $13, $11)"
).bind(&task_id).bind(&admin_id).bind(provider_id).bind(model_id)
.bind(&hash).bind(status).bind(&body)
.bind(in_tok).bind(out_tok).bind(err.as_deref())
.bind(&ts).bind(&ts).bind(if status == "queued" { None::<&str> } else { Some(ts_completed.as_str()) })
.execute(pool).await?;
}
// ===== 6. Agent Templates =====
// Each tuple: (id, name, description, category, model, system_prompt, tools, capabilities, temperature, max_tokens,
// soul_content, scenarios, welcome_message, quick_commands, personality, communication_style, emoji, source_id)
let agent_templates: [(&str, &str, &str, &str, &str, &str, &str, &str, f64, i32,
&str, &str, &str, &str, &str, &str, &str, &str); 6] = [
("demo-agent-coder", "Code Assistant", "A helpful coding assistant that can write, review, and debug code",
"coding", "gpt-4o",
"You are an expert coding assistant. Help users write clean, efficient code.",
"[\"code_search\",\"code_edit\",\"terminal\"]", "[\"code_generation\",\"code_review\",\"debugging\"]",
0.3, 8192,
"你是一位资深全栈工程师,擅长代码编写、评审和调试。你追求简洁高效的代码风格,注重可读性和可维护性。",
"[\"代码编写\",\"代码审查\",\"Bug调试\",\"架构设计\"]",
"你好!我是你的编程助手,有什么代码问题可以随时问我。",
"[{\"label\":\"写一个函数\",\"command\":\"帮我写一个\"},{\"label\":\"审查代码\",\"command\":\"请审查这段代码\"},{\"label\":\"解释代码\",\"command\":\"解释一下这段代码\"}]",
"professional", "concise", "💻", "code-assistant-v1"),
("demo-agent-writer", "Content Writer", "Creative writing and content generation agent",
"creative", "claude-sonnet-4-20250514",
"You are a skilled content writer. Create engaging, well-structured content.",
"[\"web_search\",\"document_edit\"]", "[\"writing\",\"editing\",\"summarization\"]",
0.7, 4096,
"你是一位创意写作专家,擅长各类文案创作、内容编辑和摘要生成。你善于把握文字的节奏和情感表达。",
"[\"文章写作\",\"文案创作\",\"内容编辑\",\"摘要生成\"]",
"你好!我是你的内容创作助手,需要写点什么?",
"[{\"label\":\"写一篇文章\",\"command\":\"帮我写一篇关于\"},{\"label\":\"润色文案\",\"command\":\"帮我优化这段文字\"},{\"label\":\"生成摘要\",\"command\":\"请为以下内容生成摘要\"}]",
"creative", "warm", "✍️", "content-writer-v1"),
("demo-agent-analyst", "Data Analyst", "Data analysis and visualization specialist",
"analytics", "gpt-4o",
"You are a data analysis expert. Help users analyze data and create visualizations.",
"[\"code_execution\",\"data_access\"]", "[\"data_analysis\",\"visualization\",\"statistics\"]",
0.2, 8192,
"你是一位数据分析专家,擅长统计分析、数据可视化和洞察提取。你善于从数据中发现有价值的模式和趋势。",
"[\"数据分析\",\"可视化报表\",\"统计建模\",\"趋势预测\"]",
"你好!我是你的数据分析助手,请分享你的数据或问题。",
"[{\"label\":\"分析数据\",\"command\":\"帮我分析这组数据\"},{\"label\":\"生成图表\",\"command\":\"为以下数据生成图表\"},{\"label\":\"统计摘要\",\"command\":\"请给出统计摘要\"}]",
"analytical", "structured", "📊", "data-analyst-v1"),
("demo-agent-researcher", "Research Agent", "Deep research and information synthesis agent",
"research", "gemini-2.5-pro",
"You are a research specialist. Conduct thorough research and synthesize findings.",
"[\"web_search\",\"document_access\"]", "[\"research\",\"synthesis\",\"citation\"]",
0.4, 16384,
"你是一位深度研究专家,擅长信息检索、文献综述和知识综合。你注重信息来源的可靠性和引用的准确性。",
"[\"深度研究\",\"文献综述\",\"信息检索\",\"知识综合\"]",
"你好!我是你的研究助手,需要我帮你调查什么话题?",
"[{\"label\":\"深度研究\",\"command\":\"请深入研究\"},{\"label\":\"文献综述\",\"command\":\"帮我写一份文献综述\"},{\"label\":\"对比分析\",\"command\":\"请对比分析\"}]",
"scholarly", "detailed", "🔬", "research-agent-v1"),
("demo-agent-translator", "Translator", "Multi-language translation agent",
"utility", "deepseek-chat",
"You are a professional translator. Translate text accurately while preserving tone and context.",
"[]", "[\"translation\",\"localization\"]",
0.3, 4096,
"你是一位专业翻译,精通中英日韩等多种语言。你注重准确传达原文含义,同时保持目标语言的自然流畅。",
"[\"文本翻译\",\"文档本地化\",\"术语管理\",\"双语校对\"]",
"你好!我是你的翻译助手,请发送需要翻译的文本。",
"[{\"label\":\"中译英\",\"command\":\"请将以下中文翻译为英文\"},{\"label\":\"英译中\",\"command\":\"请将以下英文翻译为中文\"},{\"label\":\"润色译文\",\"command\":\"请润色这段翻译\"}]",
"professional", "precise", "🌐", "translator-v1"),
("demo-agent-medical", "医疗助手", "Clinical decision support and medical literature assistant",
"healthcare", "gpt-4o",
"You are a medical AI assistant. Help with clinical decision support, literature retrieval, and medication reference. Always remind users that your suggestions do not replace professional medical advice.",
"[\"web_search\",\"document_access\"]", "[\"clinical_support\",\"literature_search\",\"diagnosis_assist\",\"medication_ref\"]",
0.2, 16384,
"你是一位医疗AI助手具备丰富的临床知识。你辅助临床决策、文献检索和用药参考。\n\n重要提示:\n- 你的建议仅供医疗专业人员参考\n- 不能替代正式的医疗诊断\n- 涉及患者安全的问题需格外谨慎\n- 始终建议用户咨询专业医生",
"[\"临床辅助\",\"文献检索\",\"诊断建议\",\"用药参考\"]",
"你好我是你的医疗AI助手。我可以帮助你进行临床决策支持、医学文献检索和用药参考。请注意我的建议仅供参考不能替代专业医疗意见。",
"[{\"label\":\"药物查询\",\"command\":\"查询药物\"},{\"label\":\"文献检索\",\"command\":\"检索相关文献\"},{\"label\":\"临床辅助\",\"command\":\"辅助临床分析\"},{\"label\":\"诊断建议\",\"command\":\"请给出诊断建议\"}]",
"professional", "cautious", "🏥", "medical-assistant-v1"),
];
for (id, name, desc, cat, model, prompt, tools, caps, temp, max_tok,
soul, scenarios, welcome, quick_cmds, personality, comm_style, emoji, source_id) in &agent_templates {
let ts = now.to_rfc3339();
sqlx::query(
"INSERT INTO agent_templates (id, name, description, category, source, model, system_prompt, tools, capabilities,
temperature, max_tokens, visibility, status, current_version, created_at, updated_at,
soul_content, scenarios, welcome_message, quick_commands, personality, communication_style, emoji, version, source_id)
VALUES ($1,$2,$3,$4,'custom',$5,$6,$7,$8,$9,$10,'public','active',1,$11,$11,$12,$13,$14,$15,$16,$17,$18,1,$19)
ON CONFLICT (id) DO NOTHING"
).bind(id).bind(name).bind(desc).bind(cat).bind(model).bind(prompt).bind(tools).bind(caps)
.bind(*temp).bind(*max_tok).bind(&ts)
.bind(soul).bind(scenarios).bind(welcome).bind(quick_cmds)
.bind(personality).bind(comm_style).bind(emoji).bind(source_id)
.execute(pool).await?;
}
// ===== 7. Config Items =====
// 分类名必须与 Admin V2 Config 页面 Tab key 一致: general/auth/relay/model/rate_limit/log
let config_items = [
("general", "max_connections", "integer", "50", "100", "最大数据库连接数"),
("general", "request_timeout_sec", "integer", "30", "60", "请求超时秒数"),
("general", "app_name", "string", "ZCLAW", "ZCLAW", "应用显示名称"),
("general", "debug_mode", "boolean", "false", "false", "调试模式"),
("auth", "session_ttl_hours", "integer", "24", "48", "会话有效期(小时)"),
("auth", "refresh_token_ttl_days", "integer", "7", "30", "刷新令牌有效期(天)"),
("auth", "max_login_attempts", "integer", "5", "10", "最大登录尝试次数"),
("auth", "totp_enabled", "boolean", "false", "false", "启用 TOTP 两步验证"),
("relay", "max_retries", "integer", "3", "5", "最大重试次数"),
("relay", "retry_delay_sec", "integer", "5", "10", "重试延迟秒数"),
("relay", "stream_timeout_sec", "integer", "120", "300", "流式响应超时秒数"),
("relay", "max_concurrent_tasks", "integer", "10", "20", "最大并发中转任务"),
("model", "default_model", "string", "gpt-4o", "gpt-4o", "默认 LLM 模型"),
("model", "max_context_tokens", "integer", "128000", "128000", "最大上下文窗口"),
("model", "stream_chunk_size", "integer", "1024", "1024", "流式响应块大小(bytes)"),
("model", "temperature", "number", "0.7", "0.7", "默认温度参数"),
("rate_limit", "rate_limit_enabled", "boolean", "true", "true", "启用请求限流"),
("rate_limit", "max_requests_per_minute", "integer", "60", "120", "每分钟最大请求数"),
("rate_limit", "burst_size", "integer", "10", "20", "突发请求上限"),
("rate_limit", "content_filter_enabled", "boolean", "true", "true", "启用内容过滤"),
("log", "log_level", "string", "info", "info", "日志级别"),
("log", "log_retention_days", "integer", "30", "90", "日志保留天数"),
("log", "audit_log_enabled", "boolean", "true", "true", "启用审计日志"),
("log", "slow_query_threshold_ms", "integer", "1000", "2000", "慢查询阈值(ms)"),
];
for (cat, key, vtype, current, default, desc) in &config_items {
let ts = now.to_rfc3339();
let id = format!("cfg-{}-{}", cat, key);
sqlx::query(
"INSERT INTO config_items (id, category, key_path, value_type, current_value, default_value, source, description, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, 'local', $7, $8, $8) ON CONFLICT (id) DO NOTHING"
).bind(&id).bind(cat).bind(key).bind(vtype).bind(current).bind(default).bind(desc).bind(&ts)
.execute(pool).await?;
}
// ===== 8. Account API Keys (account_api_keys 表) =====
let account_api_keys = [
("demo-akey-1", "demo-openai", "sk-demo-openai-key-1-xxxxx", "OpenAI API Key", "[\"relay:use\",\"model:read\"]"),
("demo-akey-2", "demo-anthropic", "sk-ant-demo-key-1-xxxxx", "Anthropic API Key", "[\"relay:use\",\"model:read\",\"config:read\"]"),
("demo-akey-3", "demo-deepseek", "sk-demo-deepseek-key-1-xxxxx", "DeepSeek API Key", "[\"relay:use\"]"),
];
for (id, provider_id, key_val, label, perms) in &account_api_keys {
let ts = now.to_rfc3339();
sqlx::query(
"INSERT INTO account_api_keys (id, account_id, provider_id, key_value, key_label, permissions, enabled, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, true, $7, $7) ON CONFLICT (id) DO NOTHING"
).bind(id).bind(&admin_id).bind(provider_id).bind(key_val).bind(label).bind(perms).bind(&ts)
.execute(pool).await?;
}
// 保留旧 api_tokens 表的种子数据(兼容旧代码路径)
let api_tokens = [
("demo-token-1", "Production API Key", "zclaw_prod_xr7Km9pQ2nBv", "[\"relay:use\",\"model:read\"]"),
("demo-token-2", "Development Key", "zclaw_dev_aB3cD5eF7gH9", "[\"relay:use\",\"model:read\",\"config:read\"]"),
("demo-token-3", "Testing Key", "zclaw_test_jK4lM6nO8pQ0", "[\"relay:use\"]"),
];
for (id, name, prefix, perms) in &api_tokens {
let ts = now.to_rfc3339();
let hash = {
use sha2::{Sha256, Digest};
hex::encode(Sha256::digest(format!("{}-dummy-hash", id).as_bytes()))
};
sqlx::query(
"INSERT INTO api_tokens (id, account_id, name, token_hash, token_prefix, permissions, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7) ON CONFLICT (id) DO NOTHING"
).bind(id).bind(&admin_id).bind(name).bind(&hash).bind(prefix).bind(perms).bind(&ts)
.execute(pool).await?;
}
// ===== 9. Operation Logs =====
let log_actions = [
("account.login", "account", "User login"),
("provider.create", "provider", "Created provider"),
("provider.update", "provider", "Updated provider config"),
("model.create", "model", "Added model configuration"),
("relay.request", "relay_task", "Relay request processed"),
("config.update", "config", "Updated system configuration"),
("account.create", "account", "New account registered"),
("api_key.create", "api_token", "Created API token"),
("prompt.update", "prompt", "Updated prompt template"),
("account.change_password", "account", "Password changed"),
("relay.retry", "relay_task", "Retried failed relay task"),
("provider_key.add", "provider_key", "Added provider key to pool"),
];
// 最近 50 条日志,散布在过去 7 天
for i in 0..50 {
let (action, target_type, _detail) = log_actions[i % log_actions.len()];
let offset_hours = (i * 3 + 1) as i64;
let ts = (now - chrono::Duration::hours(offset_hours)).to_rfc3339();
let detail = serde_json::json!({"index": i}).to_string();
sqlx::query(
"INSERT INTO operation_logs (account_id, action, target_type, target_id, details, ip_address, created_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)"
).bind(&admin_id).bind(action).bind(target_type)
.bind(&admin_id).bind(&detail).bind("127.0.0.1").bind(&ts)
.execute(pool).await?;
}
// ===== 10. Telemetry Reports =====
let telem_models = ["gpt-4o", "claude-sonnet-4-20250514", "gemini-2.5-flash", "deepseek-chat"];
for day_offset in 0i32..14 {
let day = now - chrono::Duration::days(13 - day_offset as i64);
for h in 0i32..8 {
let ts = (day + chrono::Duration::hours(h as i64 * 3)).to_rfc3339();
let model = telem_models[(day_offset as usize + h as usize) % telem_models.len()];
let report_id = format!("telem-d{}-h{}", day_offset, h);
let input = 1000 + (day_offset as i64 * 100 + h as i64 * 50);
let output = 500 + (day_offset as i64 * 50 + h as i64 * 30);
let latency = 200 + (day_offset * 10 + h * 5);
sqlx::query(
"INSERT INTO telemetry_reports (id, account_id, device_id, app_version, model_id, input_tokens, output_tokens, latency_ms, success, connection_mode, reported_at, created_at)
VALUES ($1, $2, 'demo-device-001', '0.1.0', $3, $4, $5, $6, true, 'tauri', $7, $7) ON CONFLICT (id) DO NOTHING"
).bind(&report_id).bind(&admin_id).bind(model)
.bind(input).bind(output).bind(latency).bind(&ts)
.execute(pool).await?;
}
}
tracing::info!("Demo data seeded: 5 providers, 12 models, 5 keys, ~1500 usage records, 20 relay tasks, 6 agent templates, 12 configs, 3 API tokens, 50 logs, 112 telemetry reports");
Ok(())
}
/// 修复旧种子数据:更新 config_items 分类名 + 补充 account_api_keys + 更新旧数据 account_id
///
/// 历史问题:
/// - 旧 config_items 使用 server/llm/agent/memory/security 分类,与 Admin V2 前端 Tab 不匹配
/// - 旧种子将 API Keys 写入 api_tokens 表,但 handler 读 account_api_keys 表
/// - 旧种子数据的 account_id 可能与当前 admin 不匹配
async fn fix_seed_data(pool: &PgPool) -> SaasResult<()> {
let now = chrono::Utc::now().to_rfc3339();
// 1. 获取所有 super_admin account_id可能有多个
let admins: Vec<(String,)> = sqlx::query_as(
"SELECT id FROM accounts WHERE role = 'super_admin'"
).fetch_all(pool).await?;
if admins.is_empty() {
return Ok(());
}
let admin_ids: Vec<String> = admins.into_iter().map(|(id,)| id).collect();
// 2. 更新 config_items 分类名(旧 → 新)
let category_mappings = [
("server", "general"),
("llm", "model"),
("agent", "general"),
("memory", "general"),
("security", "rate_limit"),
];
for (old_cat, new_cat) in &category_mappings {
let result = sqlx::query(
"UPDATE config_items SET category = $1, updated_at = $2 WHERE category = $3"
).bind(new_cat).bind(&now).bind(old_cat)
.execute(pool).await?;
if result.rows_affected() > 0 {
tracing::info!("Fixed config_items category: {} → {} ({} rows)", old_cat, new_cat, result.rows_affected());
}
}
// 如果新分类没有数据,补种默认配置项(幂等 ON CONFLICT DO NOTHING
let general_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM config_items WHERE category = 'general'")
.fetch_one(pool).await?;
if general_count.0 == 0 {
let new_configs = [
("general", "max_connections", "integer", "50", "100", "最大数据库连接数"),
("general", "request_timeout_sec", "integer", "30", "60", "请求超时秒数"),
("general", "app_name", "string", "ZCLAW", "ZCLAW", "应用显示名称"),
("auth", "session_ttl_hours", "integer", "24", "48", "会话有效期(小时)"),
("relay", "max_retries", "integer", "3", "5", "最大重试次数"),
("model", "default_model", "string", "gpt-4o", "gpt-4o", "默认 LLM 模型"),
("rate_limit", "rate_limit_enabled", "boolean", "true", "true", "启用请求限流"),
("log", "log_level", "string", "info", "info", "日志级别"),
];
for (cat, key, vtype, current, default, desc) in &new_configs {
let id = format!("cfg-{}-{}", cat, key);
sqlx::query(
"INSERT INTO config_items (id, category, key_path, value_type, current_value, default_value, source, description, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, 'local', $7, $8, $8) ON CONFLICT (id) DO NOTHING"
).bind(&id).bind(cat).bind(key).bind(vtype).bind(current).bind(default).bind(desc).bind(&now)
.execute(pool).await?;
}
tracing::info!("Seeded {} new config items for updated categories", new_configs.len());
}
// 3. 补种 account_api_keys幂等 ON CONFLICT DO NOTHING— 为每个 admin 补种
let provider_keys: Vec<(String, String)> = sqlx::query_as(
"SELECT id, provider_id FROM providers LIMIT 5"
).fetch_all(pool).await.unwrap_or_default();
for admin_id in &admin_ids {
let akey_count: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM account_api_keys WHERE account_id = $1")
.bind(admin_id).fetch_one(pool).await?;
if akey_count.0 > 0 { continue; }
let demo_keys = [
(format!("demo-akey-1-{}", &admin_id[..8]), "OpenAI API Key", "sk-demo-openai-key-1-xxxxx", "[\"relay:use\",\"model:read\"]"),
(format!("demo-akey-2-{}", &admin_id[..8]), "Anthropic API Key", "sk-ant-demo-key-1-xxxxx", "[\"relay:use\",\"model:read\"]"),
(format!("demo-akey-3-{}", &admin_id[..8]), "DeepSeek API Key", "sk-demo-deepseek-key-1-xxxxx", "[\"relay:use\"]"),
];
for (idx, (id, label, key_val, perms)) in demo_keys.iter().enumerate() {
let provider_id = provider_keys.get(idx).map(|(_, pid)| pid.as_str()).unwrap_or("demo-openai");
sqlx::query(
"INSERT INTO account_api_keys (id, account_id, provider_id, key_value, key_label, permissions, enabled, created_at, updated_at)
VALUES ($1, $2, $3, $4, $5, $6, true, $7, $7) ON CONFLICT (id) DO NOTHING"
).bind(id).bind(admin_id).bind(provider_id).bind(key_val).bind(label).bind(perms).bind(&now)
.execute(pool).await?;
}
tracing::info!("Seeded {} account_api_keys for admin {}", demo_keys.len(), admin_id);
}
// 4. 更新旧种子数据 — 将所有 relay_tasks/usage_records/operation_logs 等的 account_id
// 更新为每个 super_admin 都能看到(复制或统一)
// 策略:统一为第一个 super_admin然后为其余 admin 也复制关键数据
let primary_admin = &admin_ids[0];
for table in &["relay_tasks", "usage_records", "operation_logs", "telemetry_reports"] {
// 统计该表有多少不同的 account_id
let distinct_count: (i64,) = sqlx::query_as(
&format!("SELECT COUNT(DISTINCT account_id) FROM {}", table)
).fetch_one(pool).await.unwrap_or((0,));
if distinct_count.0 > 0 {
// 将所有非 primary_admin 的数据更新为 primary_admin
let result = sqlx::query(
&format!("UPDATE {} SET account_id = $1 WHERE account_id != $1", table)
).bind(primary_admin)
.execute(pool).await?;
if result.rows_affected() > 0 {
tracing::info!("Unified {} account_id to {} ({} rows fixed)", table, primary_admin, result.rows_affected());
}
}
}
// 也更新 api_tokens 表的 account_id
let _ = sqlx::query("UPDATE api_tokens SET account_id = $1 WHERE account_id != $1")
.bind(primary_admin).execute(pool).await?;
tracing::info!("Seed data fix completed");
Ok(())
}
/// 防御性检查:确保安全审计新增的列存在(即使 schema_version 显示已是最新)
///
/// 场景:旧数据库的 schema_version 已被手动更新但迁移文件未实际执行,
/// 或者迁移文件在 version check 时被跳过。
async fn ensure_security_columns(pool: &PgPool) -> SaasResult<()> {
// 检查 password_version 列是否存在
let col_exists: bool = sqlx::query_scalar(
"SELECT EXISTS (SELECT 1 FROM information_schema.columns WHERE table_name = 'accounts' AND column_name = 'password_version')"
)
.fetch_one(pool)
.await
.unwrap_or(false);
if !col_exists {
tracing::warn!("[DB] 'password_version' column missing — applying security fix migration");
sqlx::query("ALTER TABLE accounts ADD COLUMN IF NOT EXISTS password_version INTEGER NOT NULL DEFAULT 1")
.execute(pool).await?;
sqlx::query("ALTER TABLE accounts ADD COLUMN IF NOT EXISTS failed_login_count INTEGER NOT NULL DEFAULT 0")
.execute(pool).await?;
sqlx::query("ALTER TABLE accounts ADD COLUMN IF NOT EXISTS locked_until TIMESTAMPTZ")
.execute(pool).await?;
tracing::info!("[DB] Security columns (password_version, failed_login_count, locked_until) applied");
}
// 检查 rate_limit_events 表是否存在
let table_exists: bool = sqlx::query_scalar(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'rate_limit_events')"
)
.fetch_one(pool)
.await
.unwrap_or(false);
if !table_exists {
tracing::warn!("[DB] 'rate_limit_events' table missing — applying rate limit migration");
if let Err(e) = sqlx::query(
"CREATE TABLE IF NOT EXISTS rate_limit_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
key TEXT NOT NULL,
count BIGINT NOT NULL DEFAULT 1,
window_start TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
)"
).execute(pool).await {
tracing::warn!("[DB] Failed to create rate_limit_events: {}", e);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
// PostgreSQL 单元测试需要真实数据库连接,此处保留接口兼容
// 集成测试见 tests/integration_test.rs
}