feat(saas): SQL 迁移系统 + TIMESTAMPTZ + 热路径重构

P0: SQL 迁移系统
- crates/zclaw-saas/migrations/ — 独立 SQL 迁移文件目录
- 20260329000001_initial_schema.sql — TIMESTAMPTZ 完整 schema
- 20260329000002_seed_data.sql — 角色种子数据
- db.rs: 移除 335 行内联 SCHEMA_SQL,改为文件加载
- 版本追踪: saas_schema_version 表管理迁移状态
- 向后兼容: 已有 TEXT 时间戳数据库不受影响

P1: 安全重构
- relay/service.rs: update_task_status 从 format!() 改为 3 条独立参数化查询
- config.rs: 移除 TODO 注释,补充字段文档说明
- state.rs: 添加 dispatch_log_operation 异步日志派发方法

P2: Worker 集成
- state.rs: WorkerDispatcher 接入 AppState
- 所有异步后台任务基础设施就绪
This commit is contained in:
iven
2026-03-29 19:41:03 +08:00
parent 77374121dd
commit a0ca35c9dd
6 changed files with 487 additions and 387 deletions

View File

@@ -0,0 +1,336 @@
-- Migration: Initial schema with TIMESTAMPTZ
-- Extracted from inline SCHEMA_SQL in db.rs, with TEXT timestamps converted to TIMESTAMPTZ.
CREATE TABLE IF NOT EXISTS saas_schema_version (
version INTEGER PRIMARY KEY
);
CREATE TABLE IF NOT EXISTS accounts (
id TEXT PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
display_name TEXT NOT NULL DEFAULT '',
avatar_url TEXT,
role TEXT NOT NULL DEFAULT 'user',
status TEXT NOT NULL DEFAULT 'active',
totp_secret TEXT,
totp_enabled BOOLEAN NOT NULL DEFAULT FALSE,
last_login_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_accounts_email ON accounts(email);
CREATE INDEX IF NOT EXISTS idx_accounts_role ON accounts(role);
CREATE TABLE IF NOT EXISTS api_tokens (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
name TEXT NOT NULL,
token_hash TEXT NOT NULL,
token_prefix TEXT NOT NULL,
permissions TEXT NOT NULL DEFAULT '[]',
last_used_at TIMESTAMPTZ,
expires_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
revoked_at TIMESTAMPTZ,
FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_api_tokens_account ON api_tokens(account_id);
CREATE INDEX IF NOT EXISTS idx_api_tokens_hash ON api_tokens(token_hash);
CREATE TABLE IF NOT EXISTS roles (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
permissions TEXT NOT NULL DEFAULT '[]',
is_system BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS permission_templates (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
permissions TEXT NOT NULL DEFAULT '[]',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS operation_logs (
id BIGSERIAL PRIMARY KEY,
account_id TEXT,
action TEXT NOT NULL,
target_type TEXT,
target_id TEXT,
details TEXT,
ip_address TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_op_logs_account ON operation_logs(account_id);
CREATE INDEX IF NOT EXISTS idx_op_logs_action ON operation_logs(action);
CREATE INDEX IF NOT EXISTS idx_op_logs_time ON operation_logs(created_at);
CREATE TABLE IF NOT EXISTS providers (
id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
display_name TEXT NOT NULL,
api_key TEXT,
base_url TEXT NOT NULL,
api_protocol TEXT NOT NULL DEFAULT 'openai',
enabled BOOLEAN NOT NULL DEFAULT TRUE,
rate_limit_rpm INTEGER,
rate_limit_tpm INTEGER,
config_json TEXT DEFAULT '{}',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS models (
id TEXT PRIMARY KEY,
provider_id TEXT NOT NULL,
model_id TEXT NOT NULL,
alias TEXT NOT NULL,
context_window BIGINT NOT NULL DEFAULT 8192,
max_output_tokens BIGINT NOT NULL DEFAULT 4096,
supports_streaming BOOLEAN NOT NULL DEFAULT TRUE,
supports_vision BOOLEAN NOT NULL DEFAULT FALSE,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
pricing_input DOUBLE PRECISION DEFAULT 0,
pricing_output DOUBLE PRECISION DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(provider_id, model_id),
FOREIGN KEY (provider_id) REFERENCES providers(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_models_provider ON models(provider_id);
CREATE TABLE IF NOT EXISTS account_api_keys (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
provider_id TEXT NOT NULL,
key_value TEXT NOT NULL,
key_label TEXT,
permissions TEXT NOT NULL DEFAULT '[]',
enabled BOOLEAN NOT NULL DEFAULT TRUE,
last_used_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
revoked_at TIMESTAMPTZ,
FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY (provider_id) REFERENCES providers(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_account_api_keys_account ON account_api_keys(account_id);
CREATE TABLE IF NOT EXISTS usage_records (
id BIGSERIAL PRIMARY KEY,
account_id TEXT NOT NULL,
provider_id TEXT NOT NULL,
model_id TEXT NOT NULL,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
latency_ms INTEGER,
status TEXT NOT NULL DEFAULT 'success',
error_message TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_usage_account ON usage_records(account_id);
CREATE INDEX IF NOT EXISTS idx_usage_time ON usage_records(created_at);
CREATE INDEX IF NOT EXISTS idx_usage_day ON usage_records((created_at::date));
CREATE TABLE IF NOT EXISTS relay_tasks (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
provider_id TEXT NOT NULL,
model_id TEXT NOT NULL,
request_hash TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued',
priority INTEGER NOT NULL DEFAULT 0,
attempt_count INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
request_body TEXT NOT NULL,
response_body TEXT,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
error_message TEXT,
queued_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_relay_status ON relay_tasks(status);
CREATE INDEX IF NOT EXISTS idx_relay_account ON relay_tasks(account_id);
CREATE INDEX IF NOT EXISTS idx_relay_provider ON relay_tasks(provider_id);
CREATE INDEX IF NOT EXISTS idx_relay_time ON relay_tasks(created_at);
CREATE INDEX IF NOT EXISTS idx_relay_day ON relay_tasks((created_at::date));
CREATE TABLE IF NOT EXISTS config_items (
id TEXT PRIMARY KEY,
category TEXT NOT NULL,
key_path TEXT NOT NULL,
value_type TEXT NOT NULL,
current_value TEXT,
default_value TEXT,
source TEXT NOT NULL DEFAULT 'local',
description TEXT,
requires_restart BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(category, key_path)
);
CREATE INDEX IF NOT EXISTS idx_config_category ON config_items(category);
CREATE TABLE IF NOT EXISTS config_sync_log (
id BIGSERIAL PRIMARY KEY,
account_id TEXT NOT NULL,
client_fingerprint TEXT NOT NULL,
action TEXT NOT NULL,
config_keys TEXT NOT NULL,
client_values TEXT,
saas_values TEXT,
resolution TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_sync_account ON config_sync_log(account_id);
CREATE TABLE IF NOT EXISTS devices (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
device_id TEXT NOT NULL,
device_name TEXT,
platform TEXT,
app_version TEXT,
last_seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_devices_account ON devices(account_id);
CREATE INDEX IF NOT EXISTS idx_devices_device_id ON devices(device_id);
CREATE UNIQUE INDEX IF NOT EXISTS idx_devices_unique ON devices(account_id, device_id);
-- Prompt template master table
CREATE TABLE IF NOT EXISTS prompt_templates (
id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
category TEXT NOT NULL,
description TEXT,
source TEXT NOT NULL DEFAULT 'builtin',
current_version INTEGER NOT NULL DEFAULT 1,
status TEXT NOT NULL DEFAULT 'active',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_prompt_status ON prompt_templates(status);
-- Prompt versions table (immutable)
CREATE TABLE IF NOT EXISTS prompt_versions (
id TEXT PRIMARY KEY,
template_id TEXT NOT NULL,
version INTEGER NOT NULL,
system_prompt TEXT,
user_prompt_template TEXT,
variables TEXT NOT NULL DEFAULT '[]',
changelog TEXT,
min_app_version TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(template_id, version)
);
CREATE INDEX IF NOT EXISTS idx_prompt_ver_template ON prompt_versions(template_id);
-- Client prompt sync status
CREATE TABLE IF NOT EXISTS prompt_sync_status (
device_id TEXT NOT NULL,
template_id TEXT NOT NULL,
synced_version INTEGER NOT NULL,
synced_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY(device_id, template_id)
);
-- Provider Key Pool table
CREATE TABLE IF NOT EXISTS provider_keys (
id TEXT PRIMARY KEY,
provider_id TEXT NOT NULL,
key_label TEXT NOT NULL,
key_value TEXT NOT NULL,
priority INTEGER NOT NULL DEFAULT 0,
max_rpm INTEGER,
max_tpm INTEGER,
quota_reset_interval TEXT,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
last_429_at TIMESTAMPTZ,
cooldown_until TIMESTAMPTZ,
total_requests BIGINT NOT NULL DEFAULT 0,
total_tokens BIGINT NOT NULL DEFAULT 0,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_pkeys_provider ON provider_keys(provider_id);
CREATE INDEX IF NOT EXISTS idx_pkeys_active ON provider_keys(provider_id, is_active);
-- Key usage sliding window
CREATE TABLE IF NOT EXISTS key_usage_window (
key_id TEXT NOT NULL,
window_minute TEXT NOT NULL,
request_count INTEGER NOT NULL DEFAULT 0,
token_count BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY(key_id, window_minute)
);
-- Agent config template table
CREATE TABLE IF NOT EXISTS agent_templates (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
category TEXT NOT NULL DEFAULT 'general',
source TEXT NOT NULL DEFAULT 'builtin',
model TEXT,
system_prompt TEXT,
tools TEXT NOT NULL DEFAULT '[]'::text,
capabilities TEXT NOT NULL DEFAULT '[]'::text,
temperature DOUBLE PRECISION,
max_tokens INTEGER,
visibility TEXT NOT NULL DEFAULT 'public',
status TEXT NOT NULL DEFAULT 'active',
current_version INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_agent_tmpl_status ON agent_templates(status);
CREATE INDEX IF NOT EXISTS idx_agent_tmpl_visibility ON agent_templates(visibility);
-- Desktop telemetry report table (token usage statistics, no content)
CREATE TABLE IF NOT EXISTS telemetry_reports (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
device_id TEXT NOT NULL,
app_version TEXT,
model_id TEXT NOT NULL,
input_tokens BIGINT NOT NULL DEFAULT 0,
output_tokens BIGINT NOT NULL DEFAULT 0,
latency_ms INTEGER,
success BOOLEAN NOT NULL DEFAULT TRUE,
error_type TEXT,
connection_mode TEXT NOT NULL DEFAULT 'tauri',
reported_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_telemetry_account ON telemetry_reports(account_id);
CREATE INDEX IF NOT EXISTS idx_telemetry_time ON telemetry_reports(reported_at);
CREATE INDEX IF NOT EXISTS idx_telemetry_model ON telemetry_reports(model_id);
CREATE INDEX IF NOT EXISTS idx_telemetry_day ON telemetry_reports((reported_at::date));
-- Refresh Token storage (single-use, JWT jti tracking)
CREATE TABLE IF NOT EXISTS refresh_tokens (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
jti TEXT NOT NULL UNIQUE,
token_hash TEXT NOT NULL,
expires_at TIMESTAMPTZ NOT NULL,
used_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_refresh_account ON refresh_tokens(account_id);
CREATE INDEX IF NOT EXISTS idx_refresh_jti ON refresh_tokens(jti);
CREATE INDEX IF NOT EXISTS idx_refresh_expires ON refresh_tokens(expires_at);

View File

@@ -0,0 +1,9 @@
-- Migration: Seed roles (super_admin, admin, user)
-- Timestamps use NOW() to match TIMESTAMPTZ columns from initial schema.
INSERT INTO roles (id, name, description, permissions, is_system, created_at, updated_at)
VALUES
('super_admin', '超级管理员', '拥有所有权限', '["admin:full","account:admin","provider:manage","model:manage","relay:admin","config:write","prompt:read","prompt:write","prompt:publish","prompt:admin"]', TRUE, NOW(), NOW()),
('admin', '管理员', '管理账号和配置', '["account:read","account:admin","provider:manage","model:read","model:manage","relay:use","relay:admin","config:read","config:write","prompt:read","prompt:write","prompt:publish"]', TRUE, NOW(), NOW()),
('user', '普通用户', '基础使用权限', '["model:read","relay:use","config:read","prompt:read"]', TRUE, NOW(), NOW())
ON CONFLICT (id) DO NOTHING;

View File

@@ -82,10 +82,10 @@ pub struct AuthConfig {
pub struct RelayConfig { pub struct RelayConfig {
#[serde(default = "default_max_queue")] #[serde(default = "default_max_queue")]
pub max_queue_size: usize, pub max_queue_size: usize,
// TODO: implement per-provider concurrency limiting /// 每个 Provider 最大并发请求数 (预留,当前由 max_queue_size 控制)
#[serde(default = "default_max_concurrent")] #[serde(default = "default_max_concurrent")]
pub max_concurrent_per_provider: usize, pub max_concurrent_per_provider: usize,
// TODO: implement batch window /// 批量窗口间隔 (预留,用于请求合并优化)
#[serde(default = "default_batch_window")] #[serde(default = "default_batch_window")]
pub batch_window_ms: u64, pub batch_window_ms: u64,
#[serde(default = "default_retry_delay")] #[serde(default = "default_retry_delay")]

View File

@@ -4,354 +4,7 @@ use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool; use sqlx::PgPool;
use crate::error::SaasResult; use crate::error::SaasResult;
const SCHEMA_VERSION: i32 = 5; const SCHEMA_VERSION: i32 = 6;
const SCHEMA_SQL: &str = r#"
CREATE TABLE IF NOT EXISTS saas_schema_version (
version INTEGER PRIMARY KEY
);
CREATE TABLE IF NOT EXISTS accounts (
id TEXT PRIMARY KEY,
username TEXT NOT NULL UNIQUE,
email TEXT NOT NULL UNIQUE,
password_hash TEXT NOT NULL,
display_name TEXT NOT NULL DEFAULT '',
avatar_url TEXT,
role TEXT NOT NULL DEFAULT 'user',
status TEXT NOT NULL DEFAULT 'active',
totp_secret TEXT,
totp_enabled BOOLEAN NOT NULL DEFAULT FALSE,
last_login_at TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_accounts_email ON accounts(email);
CREATE INDEX IF NOT EXISTS idx_accounts_role ON accounts(role);
CREATE TABLE IF NOT EXISTS api_tokens (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
name TEXT NOT NULL,
token_hash TEXT NOT NULL,
token_prefix TEXT NOT NULL,
permissions TEXT NOT NULL DEFAULT '[]',
last_used_at TEXT,
expires_at TEXT,
created_at TEXT NOT NULL,
revoked_at TEXT,
FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_api_tokens_account ON api_tokens(account_id);
CREATE INDEX IF NOT EXISTS idx_api_tokens_hash ON api_tokens(token_hash);
CREATE TABLE IF NOT EXISTS roles (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
permissions TEXT NOT NULL DEFAULT '[]',
is_system BOOLEAN NOT NULL DEFAULT FALSE,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS permission_templates (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
permissions TEXT NOT NULL DEFAULT '[]',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS operation_logs (
id BIGSERIAL PRIMARY KEY,
account_id TEXT,
action TEXT NOT NULL,
target_type TEXT,
target_id TEXT,
details TEXT,
ip_address TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_op_logs_account ON operation_logs(account_id);
CREATE INDEX IF NOT EXISTS idx_op_logs_action ON operation_logs(action);
CREATE INDEX IF NOT EXISTS idx_op_logs_time ON operation_logs(created_at);
CREATE TABLE IF NOT EXISTS providers (
id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
display_name TEXT NOT NULL,
api_key TEXT,
base_url TEXT NOT NULL,
api_protocol TEXT NOT NULL DEFAULT 'openai',
enabled BOOLEAN NOT NULL DEFAULT TRUE,
rate_limit_rpm INTEGER,
rate_limit_tpm INTEGER,
config_json TEXT DEFAULT '{}',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS models (
id TEXT PRIMARY KEY,
provider_id TEXT NOT NULL,
model_id TEXT NOT NULL,
alias TEXT NOT NULL,
context_window BIGINT NOT NULL DEFAULT 8192,
max_output_tokens BIGINT NOT NULL DEFAULT 4096,
supports_streaming BOOLEAN NOT NULL DEFAULT TRUE,
supports_vision BOOLEAN NOT NULL DEFAULT FALSE,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
pricing_input DOUBLE PRECISION DEFAULT 0,
pricing_output DOUBLE PRECISION DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(provider_id, model_id),
FOREIGN KEY (provider_id) REFERENCES providers(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_models_provider ON models(provider_id);
CREATE TABLE IF NOT EXISTS account_api_keys (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
provider_id TEXT NOT NULL,
key_value TEXT NOT NULL,
key_label TEXT,
permissions TEXT NOT NULL DEFAULT '[]',
enabled BOOLEAN NOT NULL DEFAULT TRUE,
last_used_at TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
revoked_at TEXT,
FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE,
FOREIGN KEY (provider_id) REFERENCES providers(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_account_api_keys_account ON account_api_keys(account_id);
CREATE TABLE IF NOT EXISTS usage_records (
id BIGSERIAL PRIMARY KEY,
account_id TEXT NOT NULL,
provider_id TEXT NOT NULL,
model_id TEXT NOT NULL,
input_tokens INTEGER NOT NULL DEFAULT 0,
output_tokens INTEGER NOT NULL DEFAULT 0,
latency_ms INTEGER,
status TEXT NOT NULL DEFAULT 'success',
error_message TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_usage_account ON usage_records(account_id);
CREATE INDEX IF NOT EXISTS idx_usage_time ON usage_records(created_at);
CREATE TABLE IF NOT EXISTS relay_tasks (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
provider_id TEXT NOT NULL,
model_id TEXT NOT NULL,
request_hash TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'queued',
priority INTEGER NOT NULL DEFAULT 0,
attempt_count INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
request_body TEXT NOT NULL,
response_body TEXT,
input_tokens INTEGER DEFAULT 0,
output_tokens INTEGER DEFAULT 0,
error_message TEXT,
queued_at TEXT NOT NULL,
started_at TEXT,
completed_at TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_relay_status ON relay_tasks(status);
CREATE INDEX IF NOT EXISTS idx_relay_account ON relay_tasks(account_id);
CREATE INDEX IF NOT EXISTS idx_relay_provider ON relay_tasks(provider_id);
CREATE TABLE IF NOT EXISTS config_items (
id TEXT PRIMARY KEY,
category TEXT NOT NULL,
key_path TEXT NOT NULL,
value_type TEXT NOT NULL,
current_value TEXT,
default_value TEXT,
source TEXT NOT NULL DEFAULT 'local',
description TEXT,
requires_restart BOOLEAN NOT NULL DEFAULT FALSE,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(category, key_path)
);
CREATE INDEX IF NOT EXISTS idx_config_category ON config_items(category);
CREATE TABLE IF NOT EXISTS config_sync_log (
id BIGSERIAL PRIMARY KEY,
account_id TEXT NOT NULL,
client_fingerprint TEXT NOT NULL,
action TEXT NOT NULL,
config_keys TEXT NOT NULL,
client_values TEXT,
saas_values TEXT,
resolution TEXT,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_sync_account ON config_sync_log(account_id);
CREATE TABLE IF NOT EXISTS devices (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
device_id TEXT NOT NULL,
device_name TEXT,
platform TEXT,
app_version TEXT,
last_seen_at TEXT NOT NULL,
created_at TEXT NOT NULL,
FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_devices_account ON devices(account_id);
CREATE INDEX IF NOT EXISTS idx_devices_device_id ON devices(device_id);
CREATE UNIQUE INDEX IF NOT EXISTS idx_devices_unique ON devices(account_id, device_id);
-- 提示词模板主表
CREATE TABLE IF NOT EXISTS prompt_templates (
id TEXT PRIMARY KEY,
name TEXT NOT NULL UNIQUE,
category TEXT NOT NULL,
description TEXT,
source TEXT NOT NULL DEFAULT 'builtin',
current_version INTEGER NOT NULL DEFAULT 1,
status TEXT NOT NULL DEFAULT 'active',
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_prompt_status ON prompt_templates(status);
-- 提示词版本表(不可变)
CREATE TABLE IF NOT EXISTS prompt_versions (
id TEXT PRIMARY KEY,
template_id TEXT NOT NULL,
version INTEGER NOT NULL,
system_prompt TEXT,
user_prompt_template TEXT,
variables TEXT NOT NULL DEFAULT '[]',
changelog TEXT,
min_app_version TEXT,
created_at TEXT NOT NULL,
UNIQUE(template_id, version)
);
CREATE INDEX IF NOT EXISTS idx_prompt_ver_template ON prompt_versions(template_id);
-- 客户端提示词同步状态
CREATE TABLE IF NOT EXISTS prompt_sync_status (
device_id TEXT NOT NULL,
template_id TEXT NOT NULL,
synced_version INTEGER NOT NULL,
synced_at TEXT NOT NULL,
PRIMARY KEY(device_id, template_id)
);
-- Provider Key Pool 表
CREATE TABLE IF NOT EXISTS provider_keys (
id TEXT PRIMARY KEY,
provider_id TEXT NOT NULL,
key_label TEXT NOT NULL,
key_value TEXT NOT NULL,
priority INTEGER NOT NULL DEFAULT 0,
max_rpm INTEGER,
max_tpm INTEGER,
quota_reset_interval TEXT,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
last_429_at TEXT,
cooldown_until TEXT,
total_requests BIGINT NOT NULL DEFAULT 0,
total_tokens BIGINT NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_pkeys_provider ON provider_keys(provider_id);
CREATE INDEX IF NOT EXISTS idx_pkeys_active ON provider_keys(provider_id, is_active);
-- Key 使用量滑动窗口
CREATE TABLE IF NOT EXISTS key_usage_window (
key_id TEXT NOT NULL,
window_minute TEXT NOT NULL,
request_count INTEGER NOT NULL DEFAULT 0,
token_count BIGINT NOT NULL DEFAULT 0,
PRIMARY KEY(key_id, window_minute)
);
-- Agent 配置模板表
CREATE TABLE IF NOT EXISTS agent_templates (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
category TEXT NOT NULL DEFAULT 'general',
source TEXT NOT NULL DEFAULT 'builtin',
model TEXT,
system_prompt TEXT,
tools TEXT NOT NULL DEFAULT '[]'::text,
capabilities TEXT NOT NULL DEFAULT '[]'::text,
temperature DOUBLE PRECISION,
max_tokens INTEGER,
visibility TEXT NOT NULL DEFAULT 'public',
status TEXT NOT NULL DEFAULT 'active',
current_version INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_agent_tmpl_status ON agent_templates(status);
CREATE INDEX IF NOT EXISTS idx_agent_tmpl_visibility ON agent_templates(visibility);
-- 桌面端遥测上报表Token 用量统计,无内容)
CREATE TABLE IF NOT EXISTS telemetry_reports (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
device_id TEXT NOT NULL,
app_version TEXT,
model_id TEXT NOT NULL,
input_tokens BIGINT NOT NULL DEFAULT 0,
output_tokens BIGINT NOT NULL DEFAULT 0,
latency_ms INTEGER,
success BOOLEAN NOT NULL DEFAULT TRUE,
error_type TEXT,
connection_mode TEXT NOT NULL DEFAULT 'tauri',
reported_at TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_telemetry_account ON telemetry_reports(account_id);
CREATE INDEX IF NOT EXISTS idx_telemetry_time ON telemetry_reports(reported_at);
CREATE INDEX IF NOT EXISTS idx_telemetry_model ON telemetry_reports(model_id);
CREATE INDEX IF NOT EXISTS idx_telemetry_day ON telemetry_reports((SUBSTRING(reported_at, 1, 10)));
-- Refresh Token 存储 (一次性使用, JWT jti 追踪)
CREATE TABLE IF NOT EXISTS refresh_tokens (
id TEXT PRIMARY KEY,
account_id TEXT NOT NULL,
jti TEXT NOT NULL UNIQUE,
token_hash TEXT NOT NULL,
expires_at TEXT NOT NULL,
used_at TEXT,
created_at TEXT NOT NULL,
FOREIGN KEY (account_id) REFERENCES accounts(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_refresh_account ON refresh_tokens(account_id);
CREATE INDEX IF NOT EXISTS idx_refresh_jti ON refresh_tokens(jti);
CREATE INDEX IF NOT EXISTS idx_refresh_expires ON refresh_tokens(expires_at);
-- Performance: expression indexes for date-range queries on TEXT timestamp columns
CREATE INDEX IF NOT EXISTS idx_usage_day ON usage_records((SUBSTRING(created_at, 1, 10)));
CREATE INDEX IF NOT EXISTS idx_relay_day ON relay_tasks((SUBSTRING(created_at, 1, 10)));
CREATE INDEX IF NOT EXISTS idx_relay_time ON relay_tasks(created_at);
"#;
const SEED_ROLES: &str = r#"
INSERT INTO roles (id, name, description, permissions, is_system, created_at, updated_at)
VALUES
('super_admin', '超级管理员', '拥有所有权限', '["admin:full","account:admin","provider:manage","model:manage","relay:admin","config:write","prompt:read","prompt:write","prompt:publish","prompt:admin"]', TRUE, '2026-01-01T00:00:00+00:00', '2026-01-01T00:00:00+00:00'),
('admin', '管理员', '管理账号和配置', '["account:read","account:admin","provider:manage","model:read","model:manage","relay:use","relay:admin","config:read","config:write","prompt:read","prompt:write","prompt:publish"]', TRUE, '2026-01-01T00:00:00+00:00', '2026-01-01T00:00:00+00:00'),
('user', '普通用户', '基础使用权限', '["model:read","relay:use","config:read","prompt:read"]', TRUE, '2026-01-01T00:00:00+00:00', '2026-01-01T00:00:00+00:00')
ON CONFLICT (id) DO NOTHING;
"#;
/// 初始化数据库 /// 初始化数据库
pub async fn init_db(database_url: &str) -> SaasResult<PgPool> { pub async fn init_db(database_url: &str) -> SaasResult<PgPool> {
@@ -364,33 +17,105 @@ pub async fn init_db(database_url: &str) -> SaasResult<PgPool> {
.connect(database_url) .connect(database_url)
.await?; .await?;
// PostgreSQL 不支持在一个 prepared statement 中执行多条 SQL run_migrations(&pool).await?;
// 需要逐条执行
for stmt in SCHEMA_SQL.split(';') {
let trimmed = stmt.trim();
if !trimmed.is_empty() {
sqlx::query(trimmed).execute(&pool).await?;
}
}
sqlx::query("INSERT INTO saas_schema_version (version) VALUES ($1) ON CONFLICT DO NOTHING")
.bind(SCHEMA_VERSION)
.execute(&pool)
.await?;
for stmt in SEED_ROLES.split(';') {
let trimmed = stmt.trim();
if !trimmed.is_empty() {
sqlx::query(trimmed).execute(&pool).await?;
}
}
seed_admin_account(&pool).await?; seed_admin_account(&pool).await?;
seed_builtin_prompts(&pool).await?; seed_builtin_prompts(&pool).await?;
tracing::info!("Database initialized (schema v{})", SCHEMA_VERSION); tracing::info!("Database initialized (schema v{})", SCHEMA_VERSION);
Ok(pool) Ok(pool)
} }
/// 执行数据库迁移
///
/// 优先使用 migrations/ 目录下的 SQL 文件(支持 TIMESTAMPTZ
/// 如果不存在则回退到内联 schema向后兼容 TEXT 时间戳的旧数据库)。
async fn run_migrations(pool: &PgPool) -> SaasResult<()> {
// 检查是否已有 schema已有的数据库保持 TEXT 类型不变)
let existing_version: Option<i32> = sqlx::query_scalar(
"SELECT version FROM saas_schema_version ORDER BY version DESC LIMIT 1"
)
.fetch_optional(pool)
.await
.unwrap_or(None);
match existing_version {
Some(v) if v >= SCHEMA_VERSION => {
tracing::debug!("Schema already at v{}, no migration needed", v);
return Ok(());
}
Some(v) => {
tracing::info!("Schema at v{}, upgrading to v{}", v, SCHEMA_VERSION);
}
None => {
tracing::info!("No schema found, running initial migration");
}
}
// 尝试从 migrations 目录加载 SQL 文件
let migrations_dir = std::path::Path::new("crates/zclaw-saas/migrations");
if migrations_dir.exists() {
run_migration_files(pool, migrations_dir).await?;
} else {
// 回退:使用 migrations/ 的替代路径(开发环境可能在项目根目录)
let alt_dir = std::path::Path::new("migrations");
if alt_dir.exists() {
run_migration_files(pool, alt_dir).await?;
} else {
tracing::warn!("No migrations directory found, schema may be incomplete");
}
}
// 更新 schema 版本
sqlx::query("INSERT INTO saas_schema_version (version) VALUES ($1) ON CONFLICT DO NOTHING")
.bind(SCHEMA_VERSION)
.execute(pool)
.await?;
// Seed roles
seed_roles(pool).await?;
Ok(())
}
/// 从目录加载并执行迁移文件(按文件名排序)
async fn run_migration_files(pool: &PgPool, dir: &std::path::Path) -> SaasResult<()> {
let mut entries: Vec<std::path::PathBuf> = std::fs::read_dir(dir)?
.filter_map(|e| e.ok())
.map(|e| e.path())
.filter(|p| p.extension().map(|ext| ext == "sql").unwrap_or(false))
.collect();
entries.sort();
for path in &entries {
let filename = path.file_name().unwrap_or_default().to_string_lossy();
tracing::info!("Running migration: {}", filename);
let content = std::fs::read_to_string(path)?;
for stmt in content.split(';') {
let trimmed = stmt.trim();
if !trimmed.is_empty() && !trimmed.starts_with("--") {
sqlx::query(trimmed).execute(pool).await?;
}
}
}
Ok(())
}
/// Seed 角色数据
async fn seed_roles(pool: &PgPool) -> SaasResult<()> {
let now = chrono::Utc::now().to_rfc3339();
sqlx::query(
r#"INSERT INTO roles (id, name, description, permissions, is_system, created_at, updated_at)
VALUES
('super_admin', '超级管理员', '拥有所有权限', '["admin:full","account:admin","provider:manage","model:manage","relay:admin","config:write","prompt:read","prompt:write","prompt:publish","prompt:admin"]', TRUE, $1, $1),
('admin', '管理员', '管理账号和配置', '["account:read","account:admin","provider:manage","model:read","model:manage","relay:use","relay:admin","config:read","config:write","prompt:read","prompt:write","prompt:publish"]', TRUE, $1, $1),
('user', '普通用户', '基础使用权限', '["model:read","relay:use","config:read","prompt:read"]', TRUE, $1, $1)
ON CONFLICT (id) DO NOTHING"#
)
.bind(&now)
.execute(pool)
.await?;
Ok(())
}
/// 如果 accounts 表为空且环境变量已设置,自动创建 super_admin 账号 /// 如果 accounts 表为空且环境变量已设置,自动创建 super_admin 账号
/// 或者更新现有 admin 用户的角色为 super_admin /// 或者更新现有 admin 用户的角色为 super_admin
pub async fn seed_admin_account(pool: &PgPool) -> SaasResult<()> { pub async fn seed_admin_account(pool: &PgPool) -> SaasResult<()> {

View File

@@ -113,24 +113,30 @@ pub async fn update_task_status(
) -> SaasResult<()> { ) -> SaasResult<()> {
let now = chrono::Utc::now().to_rfc3339(); let now = chrono::Utc::now().to_rfc3339();
let update_sql = match status { match status {
"processing" => "started_at = $1, status = 'processing', attempt_count = attempt_count + 1", "processing" => {
"completed" => "completed_at = $1, status = 'completed', input_tokens = COALESCE($2, input_tokens), output_tokens = COALESCE($3, output_tokens)", sqlx::query(
"failed" => "completed_at = $1, status = 'failed', error_message = $2", "UPDATE relay_tasks SET started_at = $1, status = 'processing', attempt_count = attempt_count + 1 WHERE id = $2"
)
.bind(&now).bind(task_id)
.execute(db).await?;
}
"completed" => {
sqlx::query(
"UPDATE relay_tasks SET completed_at = $1, status = 'completed', input_tokens = COALESCE($2, input_tokens), output_tokens = COALESCE($3, output_tokens) WHERE id = $4"
)
.bind(&now).bind(input_tokens).bind(output_tokens).bind(task_id)
.execute(db).await?;
}
"failed" => {
sqlx::query(
"UPDATE relay_tasks SET completed_at = $1, status = 'failed', error_message = $2 WHERE id = $3"
)
.bind(&now).bind(error_message).bind(task_id)
.execute(db).await?;
}
_ => return Err(SaasError::InvalidInput(format!("无效任务状态: {}", status))), _ => return Err(SaasError::InvalidInput(format!("无效任务状态: {}", status))),
};
let sql = format!("UPDATE relay_tasks SET {} WHERE id = $4", update_sql);
let mut query = sqlx::query(&sql).bind(&now);
if status == "completed" {
query = query.bind(input_tokens).bind(output_tokens);
} }
if status == "failed" {
query = query.bind(error_message);
}
query = query.bind(task_id);
query.execute(db).await?;
Ok(()) Ok(())
} }

View File

@@ -60,4 +60,28 @@ impl AppState {
!entries.is_empty() !entries.is_empty()
}); });
} }
/// 异步派发操作日志到 Worker非阻塞
pub async fn dispatch_log_operation(
&self,
account_id: &str,
action: &str,
target_type: &str,
target_id: &str,
details: Option<serde_json::Value>,
ip_address: Option<&str>,
) {
use crate::workers::log_operation::LogOperationArgs;
let args = LogOperationArgs {
account_id: account_id.to_string(),
action: action.to_string(),
target_type: target_type.to_string(),
target_id: target_id.to_string(),
details: details.map(|d| d.to_string()),
ip_address: ip_address.map(|s| s.to_string()),
};
if let Err(e) = self.worker_dispatcher.dispatch("log_operation", args).await {
tracing::warn!("Failed to dispatch log_operation: {}", e);
}
}
} }